# Ejemplo procesamiento con Spark SQL
# Referencia: https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch04.html
# Datos: https://github.com/databricks/LearningSparkV2/tree/master/databricks-datasets/learning-spark-v2/flights

In [1]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [5]:
spark

In [6]:
sc

In [7]:
csvFile="gdrive/MyDrive/st0263-241/bigdata/datasets/flights/departuredelays.csv"

In [8]:
# Load csv Dataset 
#df=spark.read.csv('s3://<bucket/dir>/sample_data.csv',inferSchema=True,header=True)
df=spark.read.csv(csvFile,inferSchema=True,header=True)

In [9]:
#columns of dataframe
df.columns

['date', 'delay', 'distance', 'origin', 'destination']

In [10]:
#check number of columns
len(df.columns)

5

In [11]:
#number of records in dataframe
df.count()

1391578

In [12]:
#shape of dataset
print((df.count(),len(df.columns)))

(1391578, 5)


In [13]:
#printSchema
df.printSchema()

root
 |-- date: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [14]:
#fisrt few rows of dataframe
df.show(5)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 5 rows



In [15]:
#info about dataframe
df.describe().show()

+-------+-----------------+------------------+-----------------+-------+-----------+
|summary|             date|             delay|         distance| origin|destination|
+-------+-----------------+------------------+-----------------+-------+-----------+
|  count|          1391578|           1391578|          1391578|1391578|    1391578|
|   mean|2180446.584000322|12.079802928761449|690.5508264718184|   null|       null|
| stddev|838031.1536740534| 38.80773374985635|513.6628153663347|   null|       null|
|    min|          1010005|              -112|               21|    ABE|        ABE|
|    max|          3312359|              1642|             4330|    YUM|        YUM|
+-------+-----------------+------------------+-----------------+-------+-----------+



In [16]:
df.createOrReplaceTempView("us_delay_flights_tbl")

In [17]:
# First, we’ll find all flights whose distance is greater than 1,000 miles:
spark.sql("""SELECT distance, origin, destination 
FROM us_delay_flights_tbl WHERE distance > 1000 
ORDER BY distance DESC""").show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



In [18]:
# Next, we’ll find all flights between San Francisco (SFO) and Chicago (ORD) with at least a two-hour delay:
spark.sql("""SELECT date, delay, origin, destination 
FROM us_delay_flights_tbl 
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
ORDER by delay DESC""").show(10)

+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|2190925| 1638|   SFO|        ORD|
|1031755|  396|   SFO|        ORD|
|1022330|  326|   SFO|        ORD|
|1051205|  320|   SFO|        ORD|
|1190925|  297|   SFO|        ORD|
|2171115|  296|   SFO|        ORD|
|1071040|  279|   SFO|        ORD|
|1051550|  274|   SFO|        ORD|
|3120730|  266|   SFO|        ORD|
|1261104|  258|   SFO|        ORD|
+-------+-----+------+-----------+
only showing top 10 rows



In [19]:
# we want to label all US flights, regardless of origin and destination, 
# with an indication of the delays they experienced: Very Long Delays (> 6 hours), 
# Long Delays (2–6 hours), etc. We’ll add these human-readable labels 
# in a new column called Flight_Delays:
spark.sql("""SELECT delay, origin, destination, 
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay >= 120 AND delay <= 360 THEN 'Long Delays'
                  WHEN delay >= 60 AND delay < 120 THEN 'Short Delays'
                  WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'Early'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY origin, delay DESC""").show(10)

+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows



In [None]:
# ejecute los otros queries de la referencia citada al comienzo