# **Day 1 (chapters 1 & 2 from Spark:The definitive guide)**

1. What is bigdata?

   Big Data is a collection of data that is huge in volume, yet growing        exponentially with time. It is a data with so large size and complexity that   none of traditional data management tools can store it or process it efficiently. Big data is also a data but with huge size.

2. Why spark?

  Apache Spark is a popular open-source data processing framework used for big  data processing and analysis. It is chosen because of its:



*   Fast processing speed.
*   In-memory processing capabilities.

*   Easy to use APIs in multiple programming languages.
*   Support for batch, real-time stream and interactive processing.

*   Built-in libraries for machine learning, graph processing and SQL.
*   Ability to scale and run on a cluster of computers, making it highly scalable and fault-tolerant.
















3. What is spark?

 Apache Spark is a data processing framework that can quickly perform processing tasks on very large data sets, and can also distribute data processing tasks across multiple computers, either on its own or in tandem with other distributed computing tools. These two qualities are key to the worlds of big data and machine learning, which require the marshalling of massive computing power to crunch through large data stores. Spark also takes some of the programming burdens of these tasks off the shoulders of developers with an easy-to-use API that abstracts away much of the grunt work of distributed computing and big data processing.

4. Internals of spark?
   
   Spark's internal architecture is based on the following components:
*   Resilient Distributed Datasets (RDDs): The basic unit of data in Spark, they are distributed collections of data that can be processed in parallel.

*   Cluster Manager: Responsible for allocating resources and scheduling tasks on a cluster of machines. Spark supports multiple cluster managers such as standalone, Apache Mesos, Hadoop YARN, and Kubernetes.

*   Driver Program: Coordinates and launches tasks on a cluster. It runs the main function and creates RDDs and transformations.

*   Executors: The processes that run on worker nodes and execute the tasks assigned to them by the driver program.

*   Task: The unit of work that is assigned to a single executor, it consists of processing data from an RDD partition.

*   Shuffle: The process of redistributing data between partitions to support certain operations like join and grouping.

*   DAG Scheduler: Determines the stages of tasks and the dependencies between them to create an optimized execution plan.

*   Task Scheduler: Schedules tasks on executors based on the DAG and resource availability.

*   Block Manager: Manages the storage and retrieval of RDD partitions and other data in memory and disk.



5. Highlevel API of spark? Sparksession, Dataframe, Partitions, Transformation, Actions, Lazy Evaluation

   **SparkSession** 
   
   It was introduced in version 2.0, It is an entry point to underlying PySpark functionality in order to programmatically create PySpark RDD, DataFrame. It’s object spark is default available in pyspark-shell and it can be created programmatically using SparkSession.

  **DataFrame** 

  It is a Dataset organized into named columns.
*   It is conceptually equivalent to a table in a relational database or a data
frame in R/Python.

*   To use self-defined functions, you need to “register” them with Spark.

*   DataFrame is untyped, i.e., typing is checked at runtime.
*   DataFrame is more performance-optimal than Dataset.

 **Partitions**

 To process DataFrames/Datasets in parallel, Spark breaks up the data
into chunks called partitions, each of which is processed within an
executor.



*   Users usually do not manipulate partitions manually or individually, but
simply specify high-level transformations of data in the physical partitions.
*   Spark determines how this work will actually execute on the cluster.


*   Users can specify the number of partitions and re-partition the collection if
necessary.

 **Transformations and Actions**

 Spark DataFrame/DataSet support two types of operations:

 transformations and actions.

 **Transformations** are operations on DataFrames/DataSets that return a
new DataFrame/DataSet
E.g., select(), groupBy(), map() and filter().

 **Actions** are operations that return a result to the driver program or
write it to storage, and kick off a computation
 E.g., show(), count() and first().

 **Return type difference:** transformations return DataFrames/DataSets,
whereas actions return some other data type.
Spark treats the two operations very differently

 **Lazy Evaluation**

 Transformations are lazily evaluated, meaning that Spark will not begin
to execute them until it sees an action.
Instead, Spark internally records metadata to indicate that some
transformation operations have been requested.



In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m18.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=2f1734fe8226d55a1c5eb04661cf4d97a3d432af9343f163f54477314a9abbf1
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Assignment").getOrCreate()

#**Day 2(chapter 4,5)**

**1. Structured API - Dataframes, SQL and Dataset**
   
   Spark SQL is a library for structured data processing which provides SQL like API on top of spark stack it supports relational data processing and SQL literal syntax to perform operations on data.

 Like an RDD, a DataFrame and DataSet is an immutable distributed collection of data. Unlike an RDD, data in DataSet is organized into named columns, like a table in a relational database.

 DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction, it also provides a domain specific language API.

#2. Basic Structured Operation

#2.1 Schema

In [None]:
spark.read.format("json")\
.load("2015-summary.json")\
.schema

StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False)
])
df = spark.read.format("json")\
.schema(myManualSchema)\
.load("2015-summary.json")
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



#2.2 Columns and Expressions

In [None]:
from pyspark.sql.functions import col, column
df.select(col("count")).show()

+-----+
|count|
+-----+
|   15|
|    1|
|  344|
|   15|
|   62|
|    1|
|   62|
|  588|
|   40|
|    1|
|  325|
|   39|
|   64|
|    1|
|   41|
|   30|
|    6|
|    4|
|  230|
|    1|
+-----+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import expr
expr("(((count + 5) * 200) - 6) < count")

Column<'((((count + 5) * 200) - 6) < count)'>

In [None]:
spark.read.format("json")\
.load("2015-summary.json")\
.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']


#2.3 Records and Rows

In [None]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

#2.4 Create Rows


In [None]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [None]:
myRow[0]


'Hello'

In [None]:
myRow[2]

1

#2.6 Creating Dataframes

In [None]:
df = spark.read.format("JSON")\
.load("2015-summary.json")

In [None]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType,\
 StringType, LongType
myManualSchema = StructType([
StructField("col1", StringType(), True),
StructField("col2", StringType(), True),
StructField("col3", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

+-----+----+----+
| col1|col2|col3|
+-----+----+----+
|Hello|null|   1|
+-----+----+----+



#2.7 select and selectExpr


In [None]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [None]:
df.select(
"DEST_COUNTRY_NAME",
"ORIGIN_COUNTRY_NAME")\
.show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [None]:
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [None]:
df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME").show(2)

+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+
|    United States|    United States|
|    United States|    United States|
+-----------------+-----------------+
only showing top 2 rows



In [None]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [None]:
df.select(
expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")
).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [None]:
df.selectExpr(
"DEST_COUNTRY_NAME as newColumnName",
"DEST_COUNTRY_NAME"
).show(2)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [None]:
df.selectExpr(
"*", 
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry" )\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [None]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [None]:
from pyspark.sql.functions import lit
df.select(
expr("*"),
lit(1).alias("One")
).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



#2.8 Adding columns

In [None]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [None]:
df.withColumn(
"withinCountry",
expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



#2.9 Renaming columns

In [None]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [None]:
dfWithLongColName = df\
.withColumn(
"This Long Column-Name",
expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColName.show(2)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania|   15|              Romania|
|    United States|            Croatia|    1|              Croatia|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows



In [None]:
dfWithLongColName\
.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`" )\
.show(2)

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



In [None]:
dfWithLongColName.select(expr("`This Long Column-Name`")).columns

['This Long Column-Name']

In [None]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

In [None]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

DataFrame[count: bigint, This Long Column-Name: string]

#2.10 Changing column's type


In [None]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [None]:
df.withColumn("count",col("count").cast("int")).printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



#2.11 Filtering rows


In [None]:
colCondition = df.filter(col("count") < 2).take(2)
conditional = df.where("count < 2").take(2)

In [None]:
print(colCondition)
print(conditional)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]


In [None]:
df.where(col("count") < 2)\
.where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



#2.12 Getting unique rows

In [None]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").count()

256

In [None]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

#2.13 Sorting


In [None]:
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--

In [None]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(desc(col("count")), asc(col("DEST_COUNTRY_NAME"))).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



#2.14 Limit


In [None]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [None]:
df.orderBy(expr("count desc")).limit(6).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
+--------------------+-------------------+-----+



#2.15 Repartition and Coalesce

In [None]:
df.rdd.getNumPartitions()

1

In [None]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [None]:
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [None]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [None]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

#Day 3 (chapter 7)

#1. Aggregation Functions


 # 1.1 count


In [None]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").count()

256

#1.2 countDistinct

In [None]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

#1.3 first and last

In [None]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [None]:
df.tail(1)

[Row(DEST_COUNTRY_NAME='Greece', ORIGIN_COUNTRY_NAME='United States', count=30)]

#1.4 min and max


In [None]:
from pyspark.sql.functions import min
df.select(min("count")).take(1)

[Row(min(count)=1)]

In [None]:
from pyspark.sql.functions import max
df.select(max("count")).take(1)

[Row(max(count)=370002)]

#1.5 sum

In [None]:
from pyspark.sql.functions import sum
df.select(sum("count")).show()

+----------+
|sum(count)|
+----------+
|    453316|
+----------+



#1.6 sumDistinct

In [None]:
from pyspark.sql.functions import sum_distinct

df.select(sum_distinct("count")).show()

+-------------------+
|sum(DISTINCT count)|
+-------------------+
|             450718|
+-------------------+



In [None]:
from pyspark.sql.functions import *

df.select(avg("count")).show()

+-----------+
| avg(count)|
+-----------+
|1770.765625|
+-----------+



#1.7 grouping

In [None]:
df1= df.groupBy("DEST_COUNTRY_NAME").count()
df2 = df.groupBy("ORIGIN_COUNTRY_NAME").count()
df3 = df.groupBy("count").count()

In [None]:
df1.show(3)
df2.show(3)
df3.show(3)

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|         Anguilla|    1|
|           Russia|    1|
|         Paraguay|    1|
+-----------------+-----+
only showing top 3 rows

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|           Paraguay|    1|
|             Russia|    1|
|           Anguilla|    1|
+-------------------+-----+
only showing top 3 rows

+-----+-----+
|count|count|
+-----+-----+
|   26|    2|
|  442|    1|
|   19|    3|
+-----+-----+
only showing top 3 rows



# 1.8 Window Function

In [None]:
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [None]:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)


+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+



In [None]:
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
    .show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
+-------------+----------+------+----+



In [None]:
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
+-------------+----------+------+----------+



In [None]:
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
+-------------+----------+------+------------+



In [None]:
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+



In [None]:
from pyspark.sql.functions import cume_dist    
df.withColumn("cume_dist",cume_dist().over(windowSpec)) \
   .show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
+-------------+----------+------+------------------+



In [None]:
from pyspark.sql.functions import lag    
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
      .show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|null|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
+-------------+----------+------+----+



In [None]:
from pyspark.sql.functions import lead    
df.withColumn("lead",lead("salary",2).over(windowSpec)) \
    .show()

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sales|  4100|null|
|      Michael|     Sales|  4600|null|
+-------------+----------+------+----+



In [None]:
windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+



In [None]:
summ1=spark.read.csv("/content/2010-summary.csv",header=True,inferSchema=True)
summ2=spark.read.csv("/content/2011-summary.csv",header=True,inferSchema=True)
summ3=spark.read.csv("/content/2012-summary.csv",header=True,inferSchema=True)
summ4=spark.read.csv("/content/2013-summary.csv",header=True,inferSchema=True)
summ5=spark.read.csv("/content/2014-summary.csv",header=True,inferSchema=True)
summ6=spark.read.csv("/content/2015-summary.csv",header=True,inferSchema=True)

In [None]:
print(summ1.columns)
print(summ2.columns)
print(summ3.columns)
print(summ4.columns)
print(summ5.columns)
print(summ6.columns)

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']


# **Day 4 (chapter 8)**


# 1. Joins

# 1.1 Try out examples for each types of joins


# Inner Join:

In [None]:
inner_join = summ1.join(summ2, ["DEST_COUNTRY_NAME"], "inner")
inner_join.show(5)

+-----------------+-------------------+-----+--------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+--------------------+-----+
|    United States|            Romania|    1|               Haiti|  197|
|    United States|            Romania|    1|       French Guiana|   11|
|    United States|            Romania|    1|Saint Kitts and N...|  120|
|    United States|            Romania|    1| Trinidad and Tobago|  213|
|    United States|            Romania|    1|             Bolivia|   51|
+-----------------+-------------------+-----+--------------------+-----+
only showing top 5 rows



# Left Outer Join:

In [None]:
left_outer_join = summ3.join(summ4, ["DEST_COUNTRY_NAME"], "left_outer")
left_outer_join.show(5)

+-----------------+-------------------+-----+--------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+--------------------+-----+
|    United States|            Croatia|    1|               Haiti|  186|
|    United States|            Croatia|    1|       French Guiana|    3|
|    United States|            Croatia|    1|Saint Kitts and N...|  115|
|    United States|            Croatia|    1|             Bolivia|   13|
|    United States|            Croatia|    1| Trinidad and Tobago|  184|
+-----------------+-------------------+-----+--------------------+-----+
only showing top 5 rows



## Right Outer Join:

In [None]:
right_outer_join = summ5.join(summ6, ["DEST_COUNTRY_NAME"], "right_outer")
right_outer_join.show(5)

+-----------------+--------------------+-----+-------------------+-----+
|DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|ORIGIN_COUNTRY_NAME|count|
+-----------------+--------------------+-----+-------------------+-----+
|    United States|               Haiti|  193|            Romania|   15|
|    United States|Saint Kitts and N...|  123|            Romania|   15|
|    United States|       French Guiana|    4|            Romania|   15|
|    United States|             Bolivia|   14|            Romania|   15|
|    United States| Trinidad and Tobago|  175|            Romania|   15|
+-----------------+--------------------+-----+-------------------+-----+
only showing top 5 rows



# Full Outer Join:

In [None]:
full_outer_join = summ1.join(summ5, ["DEST_COUNTRY_NAME"], "full_outer")
full_outer_join.show(5)

+-------------------+-------------------+-----+-------------------+-----+
|  DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+-------------------+-----+
|        Afghanistan|      United States|   11|               null| null|
|            Algeria|               null| null|      United States|    9|
|             Angola|      United States|   14|      United States|   13|
|           Anguilla|      United States|   21|      United States|   34|
|Antigua and Barbuda|      United States|  123|      United States|  115|
+-------------------+-------------------+-----+-------------------+-----+
only showing top 5 rows



# Left Semi Join:

In [None]:
left_semi_join = summ1.join(summ3, summ1.DEST_COUNTRY_NAME == summ3.DEST_COUNTRY_NAME, "leftsemi")
left_semi_join.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|    United States|          Singapore|   25|
+-----------------+-------------------+-----+
only showing top 5 rows



# Left Anti Join:

In [None]:
left_anti_join = summ2.join(summ4, summ2.DEST_COUNTRY_NAME == summ4.DEST_COUNTRY_NAME, "leftanti")
left_anti_join.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            Malta|      United States|    1|
|            Yemen|      United States|    1|
|       The Gambia|      United States|    1|
|           Guinea|      United States|    5|
|          Croatia|      United States|    2|
+-----------------+-------------------+-----+
only showing top 5 rows



# Cross Join:

In [None]:
cross_join = summ2.crossJoin(summ5)
cross_join.show(5)

+-----------------+-------------------+-----+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+-----------------+-------------------+-----+
|    United States|       Saint Martin|    2|    United States|       Saint Martin|    1|
|    United States|       Saint Martin|    2|    United States|            Romania|   12|
|    United States|       Saint Martin|    2|    United States|            Croatia|    2|
|    United States|       Saint Martin|    2|    United States|            Ireland|  291|
|    United States|       Saint Martin|    2|    United States|              India|   62|
+-----------------+-------------------+-----+-----------------+-------------------+-----+
only showing top 5 rows



# 1.2. Handling Duplicate column names

**1:** Renaming the columns: You can use the withColumnRenamed method to rename the duplicate columns before the join. For example:

In [None]:
summ1.withColumnRenamed("DEST_COUNTRY_NAME", "DEST_COUNTRY_NAME_1").show(5)
summ2.withColumnRenamed("DEST_COUNTRY_NAME", "DEST_COUNTRY_NAME_2").show(5)


+-------------------+-------------------+-----+
|DEST_COUNTRY_NAME_1|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+
|      United States|            Romania|    1|
|      United States|            Ireland|  264|
|      United States|              India|   69|
|              Egypt|      United States|   24|
|  Equatorial Guinea|      United States|    1|
+-------------------+-------------------+-----+
only showing top 5 rows

+-------------------+-------------------+-----+
|DEST_COUNTRY_NAME_2|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+
|      United States|       Saint Martin|    2|
|      United States|             Guinea|    2|
|      United States|            Croatia|    1|
|      United States|            Romania|    3|
|      United States|            Ireland|  268|
+-------------------+-------------------+-----+
only showing top 5 rows



**2:** Using the as keyword: When selecting columns, you can use the alias method or the as keyword to give a new name to the duplicate column. For example:

In [None]:
summ1.select(summ1["DEST_COUNTRY_NAME"].alias("DEST_COUNTRY_NAME_1")).show(5)
summ2.selectExpr("DEST_COUNTRY_NAME as DEST_COUNTRY_NAME_1").show(5)

+-------------------+
|DEST_COUNTRY_NAME_1|
+-------------------+
|      United States|
|      United States|
|      United States|
|              Egypt|
|  Equatorial Guinea|
+-------------------+
only showing top 5 rows

+-------------------+
|DEST_COUNTRY_NAME_1|
+-------------------+
|      United States|
|      United States|
|      United States|
|      United States|
|      United States|
+-------------------+
only showing top 5 rows



**3:** Using the withColumn method: You can use the withColumn method to add a new column with a new name, and then drop the original column. For example:

In [None]:
summ= summ1.withColumn("DEST_COUNTRY_NAME_1", summ1["DEST_COUNTRY_NAME"])
summ.drop("DEST_COUNTRY_NAME").show(5)

+-------------------+-----+-------------------+
|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME_1|
+-------------------+-----+-------------------+
|            Romania|    1|      United States|
|            Ireland|  264|      United States|
|              India|   69|      United States|
|      United States|   24|              Egypt|
|      United States|    1|  Equatorial Guinea|
+-------------------+-----+-------------------+
only showing top 5 rows



**4:** Using the select method: You can use the select method to select only the columns you need from the DataFrame, which will remove the duplicate columns. For example:

In [None]:
summ.select("DEST_COUNTRY_NAME_1","count","ORIGIN_COUNTRY_NAME").show(5)

+-------------------+-----+-------------------+
|DEST_COUNTRY_NAME_1|count|ORIGIN_COUNTRY_NAME|
+-------------------+-----+-------------------+
|      United States|    1|            Romania|
|      United States|  264|            Ireland|
|      United States|   69|              India|
|              Egypt|   24|      United States|
|  Equatorial Guinea|    1|      United States|
+-------------------+-----+-------------------+
only showing top 5 rows



**5:** Using the drop method: You can use the drop method to drop duplicate columns after join the dataframe. For example:

In [None]:
sum = summ.join(summ2, summ1.DEST_COUNTRY_NAME == summ2.DEST_COUNTRY_NAME, "inner")
sum.drop("DEST_COUNTRY_NAME").show(5)

+-------------------+-----+-------------------+--------------------+-----+
|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME_1| ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+-------------------+--------------------+-----+
|            Romania|    1|      United States|               Haiti|  197|
|            Romania|    1|      United States|       French Guiana|   11|
|            Romania|    1|      United States|Saint Kitts and N...|  120|
|            Romania|    1|      United States| Trinidad and Tobago|  213|
|            Romania|    1|      United States|             Bolivia|   51|
+-------------------+-----+-------------------+--------------------+-----+
only showing top 5 rows



# 3. How spark performs joins

In PySpark, joins are performed by the join method on a DataFrame, which takes one or more DataFrames as arguments. The basic syntax for joining two DataFrames is as follows:

In [None]:
summ1.join(summ1, summ1.DEST_COUNTRY_NAME == summ1.DEST_COUNTRY_NAME, "inner").show(5)

+-----------------+-------------------+-----+-----------------+--------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+-----------------+--------------------+-----+
|    United States|            Romania|    1|    United States|              Uganda|    1|
|    United States|            Romania|    1|    United States|               Haiti|  226|
|    United States|            Romania|    1|    United States|       French Guiana|    1|
|    United States|            Romania|    1|    United States|Saint Kitts and N...|  127|
|    United States|            Romania|    1|    United States|            Slovakia|    1|
+-----------------+-------------------+-----+-----------------+--------------------+-----+
only showing top 5 rows



When the join method is called, Spark will perform the following steps:

1. Broadcast the smaller DataFrame: If one of the DataFrames is smaller than the other, Spark will broadcast it to all the worker nodes so that it can be used for the join.

2. Partition the larger DataFrame: The larger DataFrame is partitioned into smaller chunks called RDDs, which are distributed across the worker nodes.

3. Shuffle the data: The data is shuffled so that all the rows with the same join key are on the same worker node. This step is necessary so that the join can be performed in parallel.

4. Perform the join: Each worker node performs the join locally on its partition of the data. The join is performed based on the join condition specified in the join method.

5. Collect the results: The results from all the worker nodes are collected and combined to form the final joined DataFrame.

It's important to note that the performance of the join operation depends on the distribution of the data and the size of the DataFrames. If the data is not well-distributed, a large amount of data may need to be shuffled, which can cause performance issues. Additionally, if the DataFrames are very large, it may be more efficient to perform a broadcast or bucketed join, or to use a different join strategy such as map-side join.

# **Day 5 (chapter 9)**

# 1. Datasources


# 1.1. Basics of reading data


The most commonly used method for reading data in PySpark is the read method of the SparkSession object.

Here is an example of how to read a CSV file:

In [None]:
#sum1 = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

In this example, we first create a SparkSession object, and then use the read.csv method to read the CSV file located at the specified path. The header parameter is set to True so that the first row of the CSV file is used as the header, and the inferSchema parameter is set to True so that PySpark can infer the data types of the columns.

You can also read data from other file formats like json, parquet etc by using spark.read.json(), spark.read.parquet() respectively.

# 2. Basics of write data


The most commonly used method for writing data in PySpark is the write method of the DataFrame object.

Here is an example of how to write a DataFrame to a CSV file:

In [None]:
#df.write.csv("path/to/new_file.csv", header=True)

In this example, we first create a SparkSession object, and then use the read.csv method to read the CSV file located at the specified path. Then we use the write.csv method to write the DataFrame to a new CSV file located at the specified path. The header parameter is set to True so that the column names will be written as the first row of the new CSV file.

You can also write data to other file formats like json, parquet etc by using df.write.json(), df.write.parquet() respectively.
Additionally you can also write data to various databases like hive, mysql etc using df.write.jdbc() method.

# 3. CSV files - reading, writing


In [None]:
#df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

#df.write.csv("path/to/new_file.csv", header=True)

# 4. REading and writing json files


In [None]:
#df = spark.read.json("path/to/file.json")

#df.write.json("path/to/new_file.json")

# 5. Parquet files - important

Parquet is a columnar storage format that is widely used in the Apache Hadoop ecosystem and is supported by many big data processing frameworks, including PySpark. There are several reasons why Parquet is important in PySpark:

**Efficiency:** Parquet stores data in a columnar format, which means that only the required columns are read and processed, rather than reading and processing the entire row. This leads to significant performance improvements when working with large datasets.

**Compression:** Parquet supports various compression algorithms, such as Snappy and Gzip, which can greatly reduce the storage space required for large datasets.

**Schema evolution:** Parquet supports schema evolution, which means that a dataset's schema can be changed over time without having to rewrite the entire dataset. This is particularly useful when working with data that is constantly changing or evolving.

**Predicate pushdown:** Parquet supports predicate pushdown, which means that filtering conditions can be pushed down to the storage layer, rather than being applied in the query layer. This leads to further performance improvements when working with large datasets.

**Interoperability:** Parquet is an open standard, which means that it can be used with a wide variety of data processing frameworks, including PySpark, Hive, Pig, and Impala.

# 6. Reading and Writing parquet files


To read and write parquet files in PySpark you can use the read.parquet() and write.parquet() methods respectively. Here is an example:

In [None]:
#df = spark.read.parquet("path/to/file.parquet")

#df.write.parquet("path/to/new_file.parquet")

# 7. orc - optional


ORC (Optimized Row Columnar) is a file format that is similar to Parquet and is also widely used in the Apache Hadoop ecosystem. Like Parquet, ORC is a columnar storage format that is designed to improve the performance and storage efficiency of big data processing frameworks, such as PySpark.

Here are some of the key benefits of using ORC in PySpark:

**Performance:** ORC stores data in a columnar format, which leads to significant performance improvements when working with large datasets.

**Compression:** ORC supports various compression algorithms, such as Snappy, Zlib, and LZO, which can greatly reduce the storage space required for large datasets.

**Schema evolution:** ORC supports schema evolution, which means that a dataset's schema can be changed over time without having to rewrite the entire dataset.

**Predicate pushdown:** ORC supports predicate pushdown, which means that filtering conditions can be pushed down to the storage layer, rather than being applied in the query layer.

**Interoperability:** ORC is an open standard, which means that it can be used with a wide variety of data processing frameworks, including PySpark, Hive, Pig, and Impala.

# 8. Splittable File Types and COmpression

When working with PySpark, you can specify the file type and compression algorithm when reading and writing data using the appropriate methods. For example, you can use the read.parquet() method to read a Parquet file and the write.avro() method to write an Avro file.

Here is an example of reading a parquet file with snappy compression:

In [None]:
#df.write.format("parquet").option("compression", "snappy").save("path/to/new_file.parquet")

# 9. Managing File size


Managing file size in PySpark is an important consideration when working with large datasets. There are several strategies that can be used to manage the file size of your data, including:

**Partitioning:** Partitioning is the process of dividing a large dataset into smaller, more manageable chunks. In PySpark, you can partition your data using the partitionBy method when writing data to a file. This allows you to split your data into multiple smaller files based on a specific column, such as a date or a category.

**Compression:** As mentioned previously, compression is the process of reducing the size of a dataset. This can be done by using a compression algorithm, such as Snappy, Gzip, LZO, or Zlib, when reading or writing data in PySpark.

**File format:** Choosing the appropriate file format for your data can also help manage file size. Columnar file formats, such as Parquet and ORC, are often more efficient than row-based formats, such as CSV or JSON, when working with large datasets because they require less storage space and are more easily compressible.

**Filtering:** Filtering is the process of removing unnecessary data from your dataset. In PySpark, you can filter your data using the filter method to remove rows that do not meet a specific criteria. This can help reduce the file size of your data by removing unneeded information.

**Sampling:** Sampling is the process of selecting a random subset of your data to work with. In PySpark, you can use the sample method to randomly select a certain percentage or number of rows from your dataset. This can be useful when working with large datasets as it allows you to work with a smaller, more manageable subset of your data while still getting an accurate representation of the whole dataset.

# Importing

In [None]:
import gdown
file_id = "1lUfnGU_v1ZUw1dRrovASH9M5BgYeAyxQ"
url = f"https://drive.google.com/uc?id={file_id}"
gdown.download(url, "2015-summary.csv", quiet=False)

df = spark.read.format("csv").option("header", "true").load("2015-summary.csv")

Downloading...
From: https://drive.google.com/uc?id=1lUfnGU_v1ZUw1dRrovASH9M5BgYeAyxQ
To: /content/2015-summary.csv
100%|██████████| 7.08k/7.08k [00:00<00:00, 3.88MB/s]


In [None]:
df.createOrReplaceTempView("summary")

In [None]:
spark.sql('SELECT * FROM summary').show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

1. What is the total number of flights?

In [None]:
spark.sql('SELECT COUNT(*) AS ROWS_count FROM summary').show()

+----------+
|ROWS_count|
+----------+
|       256|
+----------+



2. What are the top 10 destination countries by count?

In [None]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY DEST_COUNTRY_NAME\
          ORDER BY total_count DESC\
          LIMIT 10\
          ").show()

+------------------+-----------+
| DEST_COUNTRY_NAME|total_count|
+------------------+-----------+
|     United States|   411352.0|
|            Canada|     8399.0|
|            Mexico|     7140.0|
|    United Kingdom|     2025.0|
|             Japan|     1548.0|
|           Germany|     1468.0|
|Dominican Republic|     1353.0|
|       South Korea|     1048.0|
|       The Bahamas|      955.0|
|            France|      935.0|
+------------------+-----------+



3. How many flights originated from the United States?

In [None]:
spark.sql("SELECT SUM(count)\
          FROM summary\
          WHERE ORIGIN_COUNTRY_NAME = 'United States'\
          ").show()

+----------+
|sum(count)|
+----------+
|  411966.0|
+----------+



4. What are the top 5 origin countries for flights to Japan?

In [None]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          WHERE DEST_COUNTRY_NAME = 'Japan'\
          GROUP BY ORIGIN_COUNTRY_NAME\
          ORDER BY total_count DESC\
          LIMIT 5\
          ").show()

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|total_count|
+-------------------+-----------+
|      United States|     1548.0|
+-------------------+-----------+



5. What is the total number of flights to the United States?

In [None]:
spark.sql("SELECT SUM(count)\
          FROM summary\
          WHERE DEST_COUNTRY_NAME = 'United States'\
          ").show()

+----------+
|sum(count)|
+----------+
|  411352.0|
+----------+



6. What is the total number of flights from the United States?


In [None]:
spark.sql("SELECT SUM(count)\
          FROM summary\
          WHERE ORIGIN_COUNTRY_NAME = 'United States'\
          ").show()

+----------+
|sum(count)|
+----------+
|  411966.0|
+----------+



7. What are the top 10 origin and destination pairs by count?

In [None]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME\
          ORDER BY total_count DESC\
          LIMIT 10\
").show()

+-------------------+-----------------+-----------+
|ORIGIN_COUNTRY_NAME|DEST_COUNTRY_NAME|total_count|
+-------------------+-----------------+-----------+
|      United States|    United States|   370002.0|
|             Canada|    United States|     8483.0|
|      United States|           Canada|     8399.0|
|             Mexico|    United States|     7187.0|
|      United States|           Mexico|     7140.0|
|      United States|   United Kingdom|     2025.0|
|     United Kingdom|    United States|     1970.0|
|      United States|            Japan|     1548.0|
|              Japan|    United States|     1496.0|
|      United States|          Germany|     1468.0|
+-------------------+-----------------+-----------+



8. How many flights originated from each country?

In [None]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY ORIGIN_COUNTRY_NAME\
          ORDER BY total_count DESC\
").show()

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|total_count|
+-------------------+-----------+
|      United States|   411966.0|
|             Canada|     8483.0|
|             Mexico|     7187.0|
|     United Kingdom|     1970.0|
|              Japan|     1496.0|
| Dominican Republic|     1420.0|
|            Germany|     1336.0|
|        The Bahamas|      986.0|
|             France|      952.0|
|              China|      920.0|
|           Colombia|      867.0|
|        South Korea|      827.0|
|            Jamaica|      712.0|
|        Netherlands|      660.0|
|             Brazil|      619.0|
|         Costa Rica|      608.0|
|        El Salvador|      508.0|
|               Cuba|      478.0|
|             Panama|      465.0|
|              Spain|      442.0|
+-------------------+-----------+
only showing top 20 rows



9. How many flights went to each country?

In [None]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY DEST_COUNTRY_NAME\
          ORDER BY total_count DESC\
").show()

+------------------+-----------+
| DEST_COUNTRY_NAME|total_count|
+------------------+-----------+
|     United States|   411352.0|
|            Canada|     8399.0|
|            Mexico|     7140.0|
|    United Kingdom|     2025.0|
|             Japan|     1548.0|
|           Germany|     1468.0|
|Dominican Republic|     1353.0|
|       South Korea|     1048.0|
|       The Bahamas|      955.0|
|            France|      935.0|
|          Colombia|      873.0|
|            Brazil|      853.0|
|       Netherlands|      776.0|
|             China|      772.0|
|           Jamaica|      666.0|
|        Costa Rica|      588.0|
|       El Salvador|      561.0|
|            Panama|      510.0|
|              Cuba|      466.0|
|             Spain|      420.0|
+------------------+-----------+
only showing top 20 rows



10. What is the total number of flights between the United States and Canada?

In [None]:
spark.sql("SELECT SUM(count)\
  FROM summary\
  WHERE (ORIGIN_COUNTRY_NAME = 'United States' AND DEST_COUNTRY_NAME = 'Canada') OR (ORIGIN_COUNTRY_NAME = 'Canada' AND DEST_COUNTRY_NAME = 'United States')\
").show()


+----------+
|sum(count)|
+----------+
|   16882.0|
+----------+



11. What are the 5 most common origin countries for flights to the United Kingdom?

In [None]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total_count\
  FROM summary\
  WHERE DEST_COUNTRY_NAME = 'United Kingdom'\
  GROUP BY ORIGIN_COUNTRY_NAME\
  ORDER BY total_count DESC\
  LIMIT 5\
").show()

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|total_count|
+-------------------+-----------+
|      United States|     2025.0|
+-------------------+-----------+



12. What are the top 10 destination countries for flights from China?


In [None]:
spark.sql("\
  SELECT DEST_COUNTRY_NAME, SUM(count) as total_count\
  FROM summary\
  WHERE ORIGIN_COUNTRY_NAME = 'China'\
  GROUP BY DEST_COUNTRY_NAME\
  ORDER BY total_count DESC\
  LIMIT 10\
").show()

+-----------------+-----------+
|DEST_COUNTRY_NAME|total_count|
+-----------------+-----------+
|    United States|      920.0|
+-----------------+-----------+



13. What is the total number of flights between United States and New Zealand?

In [None]:
spark.sql("SELECT SUM(count)\
  FROM summary\
  WHERE (ORIGIN_COUNTRY_NAME = 'United States' AND DEST_COUNTRY_NAME = 'New Zealand') OR (ORIGIN_COUNTRY_NAME = 'New Zealand' AND DEST_COUNTRY_NAME = 'United States')\
").show()


+----------+
|sum(count)|
+----------+
|     185.0|
+----------+



14. What is the total number of flights from India?

In [None]:
spark.sql("SELECT SUM(count)\
  FROM summary\
  WHERE ORIGIN_COUNTRY_NAME = 'India'\
").show()

+----------+
|sum(count)|
+----------+
|      62.0|
+----------+



15. What is the rank of the destination country with the most flights?

In [None]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count,\
  RANK() OVER (ORDER BY SUM(count) DESC) as rank\
  FROM summary\
  GROUP BY DEST_COUNTRY_NAME\
").show()

+------------------+-----------+----+
| DEST_COUNTRY_NAME|total_count|rank|
+------------------+-----------+----+
|     United States|   411352.0|   1|
|            Canada|     8399.0|   2|
|            Mexico|     7140.0|   3|
|    United Kingdom|     2025.0|   4|
|             Japan|     1548.0|   5|
|           Germany|     1468.0|   6|
|Dominican Republic|     1353.0|   7|
|       South Korea|     1048.0|   8|
|       The Bahamas|      955.0|   9|
|            France|      935.0|  10|
|          Colombia|      873.0|  11|
|            Brazil|      853.0|  12|
|       Netherlands|      776.0|  13|
|             China|      772.0|  14|
|           Jamaica|      666.0|  15|
|        Costa Rica|      588.0|  16|
|       El Salvador|      561.0|  17|
|            Panama|      510.0|  18|
|              Cuba|      466.0|  19|
|             Spain|      420.0|  20|
+------------------+-----------+----+
only showing top 20 rows



16. What is the rank of the destination country with the most flights from France?

In [None]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count,\
  RANK() OVER (ORDER BY SUM(count) DESC) as rank\
  FROM summary\
  WHERE ORIGIN_COUNTRY_NAME = 'France'\
  GROUP BY DEST_COUNTRY_NAME\
").show()

+-----------------+-----------+----+
|DEST_COUNTRY_NAME|total_count|rank|
+-----------------+-----------+----+
|    United States|      952.0|   1|
+-----------------+-----------+----+



17. What is the cumulative sum of flights to each destination country, ordered by the number of flights?

In [None]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count,\
  SUM(SUM(count)) OVER (ORDER BY SUM(count) DESC) as cumulative_sum\
  FROM summary\
  GROUP BY DEST_COUNTRY_NAME\
  ORDER BY total_count DESC\
").show()

+------------------+-----------+--------------+
| DEST_COUNTRY_NAME|total_count|cumulative_sum|
+------------------+-----------+--------------+
|     United States|   411352.0|      411352.0|
|            Canada|     8399.0|      419751.0|
|            Mexico|     7140.0|      426891.0|
|    United Kingdom|     2025.0|      428916.0|
|             Japan|     1548.0|      430464.0|
|           Germany|     1468.0|      431932.0|
|Dominican Republic|     1353.0|      433285.0|
|       South Korea|     1048.0|      434333.0|
|       The Bahamas|      955.0|      435288.0|
|            France|      935.0|      436223.0|
|          Colombia|      873.0|      437096.0|
|            Brazil|      853.0|      437949.0|
|       Netherlands|      776.0|      438725.0|
|             China|      772.0|      439497.0|
|           Jamaica|      666.0|      440163.0|
|        Costa Rica|      588.0|      440751.0|
|       El Salvador|      561.0|      441312.0|
|            Panama|      510.0|      44