<a href="https://colab.research.google.com/github/yola02/BigData/blob/main/Lecture4_flightdata_read.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!wget https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/refs/heads/master/data/flight-data/csv/2015-summary.csv

--2025-09-10 17:27:53--  https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/refs/heads/master/data/flight-data/csv/2015-summary.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7080 (6.9K) [text/plain]
Saving to: ‘2015-summary.csv’


2025-09-10 17:27:53 (62.2 MB/s) - ‘2015-summary.csv’ saved [7080/7080]



In [2]:
!head /content/2015-summary.csv

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("flight").getOrCreate()
flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("/content/2015-summary.csv")


In [4]:
flightData2015.count()

256

In [5]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [6]:
flightData2015.createOrReplaceTempView("flight_data_2015")

SQL-way

In [7]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

In [8]:
sqlWay.show(5)

+-----------------+--------+
|DEST_COUNTRY_NAME|count(1)|
+-----------------+--------+
|         Anguilla|       1|
|           Russia|       1|
|         Paraguay|       1|
|          Senegal|       1|
|           Sweden|       1|
+-----------------+--------+
only showing top 5 rows



Dataframe way

In [9]:
dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

In [10]:
dataFrameWay.show(5)

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|         Anguilla|    1|
|           Russia|    1|
|         Paraguay|    1|
|          Senegal|    1|
|           Sweden|    1|
+-----------------+-----+
only showing top 5 rows



In [11]:
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=159]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=172]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2

In [12]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [13]:

from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [14]:
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#130L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#17,destination_total#130L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[sum(count#19)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=310]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_sum(count#19)])
            +- FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




Can you follow the previouse examples to retrieve the count of males and females who are greater than 15 years old? ou can have the data here: https://drive.google.com/drive/folders/1ehWwunuAo7CE1Vk2JYkUnQMmxh5pph3C

In [17]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [18]:
titanic = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/content/drive/MyDrive/DATA/titanic.csv")

In [20]:
titanic.show(5)

+---+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|_c0|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+---+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|  0|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|  1|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|  2|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|  3|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|  4|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+---+-----------+--------+------+---------------

In [21]:
#just an example with dataframe feature and groupby
titanic.groupBy("Pclass").count().show()

+------+-----+
|Pclass|count|
+------+-----+
|     3|  469|
|     1|  201|
|     ?|   49|
|     2|  172|
+------+-----+



In [22]:
titanic.createOrReplaceTempView("titanic_table")

In [23]:
spark.sql("""
SELECT Sex, count(1)
FROM titanic_table
WHERE Age > 15
GROUP BY Sex
""").show()

+------+--------+
|   Sex|count(1)|
+------+--------+
|female|     218|
|  male|     413|
+------+--------+



In [24]:
spark.sql("""
SELECT Sex, count(1)
FROM titanic_table
WHERE Age > 15
GROUP BY Sex
""").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Sex#157], functions=[count(1)])
   +- Exchange hashpartitioning(Sex#157, 200), ENSURE_REQUIREMENTS, [plan_id=470]
      +- HashAggregate(keys=[Sex#157], functions=[partial_count(1)])
         +- Project [Sex#157]
            +- Filter (isnotnull(Age#158) AND (Age#158 > 15.0))
               +- FileScan csv [Sex#157,Age#158] Batched: false, DataFilters: [isnotnull(Age#158), (Age#158 > 15.0)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/DATA/titanic.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Age), GreaterThan(Age,15.0)], ReadSchema: struct<Sex:string,Age:double>


