<a href="https://colab.research.google.com/github/m-abubakar-saddique/spark-certification-exercises/blob/dev/Learning_Spark_Ch_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

:<h1><center>Learning Spark Chapter 4 Exercises</center></h1>

<a id='installing-spark'></a>
### Installing Spark

Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Set Environment Variables:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

# Downloding st-fire-call data

In [None]:
!wget https://raw.githubusercontent.com/databricks/LearningSparkV2/master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv

--2023-06-30 09:50:01--  https://raw.githubusercontent.com/databricks/LearningSparkV2/master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 33396236 (32M) [text/plain]
Saving to: ‘departuredelays.csv’


2023-06-30 09:50:02 (99.5 MB/s) - ‘departuredelays.csv’ saved [33396236/33396236]



## Reading Input Data

In [None]:
csv_file = "departuredelays.csv"
# Read and create a temporary view
# Infer schema (note that for larger files you
# may want to specify the schema)
schema = "`date` STRING, `delay` INT, `distance` INT, `origin` STRING, `destination` STRING"

df = (spark.read.format("csv")
  .schema(schema)
 .option("header", "true")
 .load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")

In [None]:
df.show(2)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
+--------+-----+--------+------+-----------+
only showing top 2 rows



## find all flights whose distance is greater than 1,000 miles

In [None]:
spark.sql("""SELECT distance, origin, destination
FROM us_delay_flights_tbl WHERE distance > 1000
ORDER BY distance DESC""").show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



# find all flights between SFO & ORD with at least a two-hour delay

In [None]:
spark.sql("""SELECT date, delay origin, destination
FROM us_delay_flights_tbl WHERE origin = 'SFO' and destination = 'ORD' and delay > 120
ORDER BY delay DESC""").show(10)

+--------+------+-----------+
|    date|origin|destination|
+--------+------+-----------+
|02190925|  1638|        ORD|
|01031755|   396|        ORD|
|01022330|   326|        ORD|
|01051205|   320|        ORD|
|01190925|   297|        ORD|
|02171115|   296|        ORD|
|01071040|   279|        ORD|
|01051550|   274|        ORD|
|03120730|   266|        ORD|
|01261104|   258|        ORD|
+--------+------+-----------+
only showing top 10 rows



# Convert Spark unix timestamp to spark timestamp
Using custom functions and datetime functionalotues

In [None]:
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions import concat, lit, to_timestamp, udf


def convert_date_string(date_string):
    date_time_obj = datetime.strptime(date_string, '%Y%m%d%H%M')
    return date_time_obj.strftime('%Y-%m-%d %I:%M %p')

newdf = df.withColumn('datetime', concat(lit('2022'), df.date))

UDF = udf(convert_date_string, StringType())
newdf = newdf.withColumn("datetime", UDF("datetime"))
newdf.withColumn('datetime', to_timestamp(newdf.datetime, 'yyyy-MM-dd hh:mm a'))






date,delay,distance,origin,destination,datetime
1011245,6,602,ABE,ATL,2022-01-01 12:45:00
1020600,-8,369,ABE,DTW,2022-01-02 06:00:00
1021245,-2,602,ABE,ATL,2022-01-02 12:45:00
1020605,-4,602,ABE,ATL,2022-01-02 06:05:00
1031245,-4,602,ABE,ATL,2022-01-03 12:45:00
1030605,0,602,ABE,ATL,2022-01-03 06:05:00
1041243,10,602,ABE,ATL,2022-01-04 12:43:00
1040605,28,602,ABE,ATL,2022-01-04 06:05:00
1051245,88,602,ABE,ATL,2022-01-05 12:45:00
1050605,9,602,ABE,ATL,2022-01-05 06:05:00
