<a href="https://colab.research.google.com/github/yoerisamwel/code_notes/blob/main/practicing_partitions_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.5.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease [24.3 kB]
Get:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease [23.8 kB]
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [665 kB]
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1,326 kB]
Get:13 http://arc

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

In [3]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/DelayedFlights.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)
df.show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
| id|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|2008|    1|         3|        4|   2003|      1955|   2211|      2225|       

In [7]:
# Create temp view named "delays"
df.createOrReplaceTempView('delays_view')

# Start the runtime
start_time = time.time()

# Using spark.sql write a query that gives you the total distance
# and the count of every unique Origin, Dest combination.
spark.sql("""SELECT concat(Origin, Dest) AS origin_dest_pair ,sum(Distance) FROM delays_view group by origin_dest_pair""").show()

# Print out the runtime.
print("--- %s seconds ---" % (time.time() - start_time))

+----------------+-------------+
|origin_dest_pair|sum(Distance)|
+----------------+-------------+
|          SMFONT|     280858.0|
|          DENMCI|     498355.0|
|          ALBTPA|      24860.0|
|          TULAUS|       7242.0|
|          CLTSRQ|      13128.0|
|          TULORD|     224640.0|
|          LGAPIT|      50250.0|
|          SLCPHL|     138672.0|
|          MEIATL|      20025.0|
|          GRBMQT|      10184.0|
|          SEASTL|     141930.0|
|          JAXBNA|     110352.0|
|          LAXOAK|     469778.0|
|          MCISAN|     137299.0|
|          PVDFLL|      34452.0|
|          SANMRY|      29328.0|
|          IAHMCI|     278419.0|
|          IAHMKE|     271584.0|
|          CLEORD|     283452.0|
|          DALIAH|      99169.0|
+----------------+-------------+
only showing top 20 rows

--- 5.747776031494141 seconds ---


In [8]:
# Write out the data in parquet format
df.write.parquet('parquet_delays', mode='overwrite')

In [9]:
# Read in our new parquet formatted data
p_df=spark.read.parquet('parquet_delays')

In [11]:
# Convert the DataFrame to a view.
p_df.createOrReplaceTempView('delays_view_2')

In [13]:
# Start the runtime
start_time = time.time()

# Run the same query here
spark.sql("""SELECT concat(Origin, Dest) AS origin_dest_pair ,sum(Distance) FROM delays_view_2 group by origin_dest_pair""").show()

# Print out the runtime
print("--- %s seconds ---" % (time.time() - start_time))

+----------------+-------------+
|origin_dest_pair|sum(Distance)|
+----------------+-------------+
|          SMFONT|     280858.0|
|          DENMCI|     498355.0|
|          ALBTPA|      24860.0|
|          TULAUS|       7242.0|
|          CLTSRQ|      13128.0|
|          TULORD|     224640.0|
|          LGAPIT|      50250.0|
|          SLCPHL|     138672.0|
|          MEIATL|      20025.0|
|          GRBMQT|      10184.0|
|          SEASTL|     141930.0|
|          JAXBNA|     110352.0|
|          LAXOAK|     469778.0|
|          MCISAN|     137299.0|
|          PVDFLL|      34452.0|
|          SANMRY|      29328.0|
|          IAHMCI|     278419.0|
|          IAHMKE|     271584.0|
|          CLEORD|     283452.0|
|          DALIAH|      99169.0|
+----------------+-------------+
only showing top 20 rows

--- 3.02018666267395 seconds ---


In [20]:
# Write out your parquet data, partitioning on the Origin column
from pyspark.sql.functions import concat, col
p_df = p_df.withColumn("origin_dest_pair", concat(col("Origin"), col("Dest")))

p_df.write.partitionBy("origin_dest_pair").parquet("parquet_delays_data2")

In [21]:
# Read in our new parquet formatted data
parquet_df = spark.read.parquet("parquet_delays_data2")


In [22]:
# Convert the dataframe to a view.
parquet_df.createOrReplaceTempView("my_parquet_view")


In [27]:
# Start the runtime
start_time = time.time()

# Run your query against your partitioned data one more time.
result_df = spark.sql("""
    SELECT concat(Origin, Dest) AS origin_dest_pair, sum(Distance) AS total_distance
    FROM my_parquet_view
    GROUP BY concat(Origin, Dest)
""")
result_df.show()

# Print out the runtime
print("--- %s seconds ---" % (time.time() - start_time))

+----------------+--------------+
|origin_dest_pair|total_distance|
+----------------+--------------+
|          LAXPHX|      510970.0|
|          DALHOU|      442150.0|
|          LASPHX|      400640.0|
|          LAXSFO|      879233.0|
|          LASLAX|      499612.0|
|          EWRATL|      902940.0|
|          DFWORD|     1306458.0|
|          MSPORD|      517700.0|
|          LGAATL|     1224449.0|
|          PHXLAS|      461056.0|
|          DENSLC|      593147.0|
|          HOUDAL|      470591.0|
|          PHXLAX|      622710.0|
|          LGAORD|     1445476.0|
|          ORDEWR|     1147524.0|
|          ORDLGA|     1828102.0|
|          ATLLGA|     1534176.0|
|          LAXLAS|      402852.0|
|          ATLEWR|     1065350.0|
|          SFOLAX|      753532.0|
+----------------+--------------+
only showing top 20 rows

--- 31.06057858467102 seconds ---


In [39]:
# Start  the runtime
start_time = time.time()

# Filter the data on something that selects your partition choice.
filtered_df = result_df.filter("origin_dest_pair = 'LAXSFO'")
# Print out the runtime.
print("--- %s seconds ---" % (time.time() - start_time))
filtered_df.show()


--- 0.014871597290039062 seconds ---
+----------------+--------------+
|origin_dest_pair|total_distance|
+----------------+--------------+
|          LAXSFO|      879233.0|
+----------------+--------------+



In [40]:
# Start  the runtime
start_time = time.time()

# Filter the data on something that has nothing to do with your partition choice.
filtered_df = result_df.filter("origin_dest_pair = 'ATLLGA'")
filtered_df.show()
# Print out the runtime.
print("--- %s seconds ---" % (time.time() - start_time))

+----------------+--------------+
|origin_dest_pair|total_distance|
+----------------+--------------+
|          ATLLGA|     1534176.0|
+----------------+--------------+

--- 22.98924469947815 seconds ---
