<a href="https://colab.research.google.com/github/linxiaoxin/DataEngineering/blob/main/colab/Spark%20Query%20Workshop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction
PySpark provides interface used to load DataFrame from external storage systems. We will learn how to read different data format files into DataFrame and write DataFrame back to different data format files using PySpark examples. Lastly, we will learn how to transfer data between JVM and Python processes using Apache Arrow efficiently.

In [1]:
# install pyspark using pip
!pip install --ignore-install -q pyspark
# install findspark using pip
!pip install --ignore-install -q findspark

#from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
import collections
spark = SparkSession.builder.master("local").appName("Ingestion").config('spark.ui.port', '4050').getOrCreate()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m17.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Read Driver CSV file
PySpark provides DataFrameReader to load a DataFrame from external storage systems (e.g. file systems, key-value stores, etc). Use SparkSession.read to access this. You can use format(source) to specify the input data source format.  
Using csv("path") or format("csv").load("path") of DataFrameReader, you can read a CSV file into a PySpark DataFrame, These methods take a file path to read from as an argument. When you use format("csv") method, you can also specify the data sources by their fully qualified name, but for built-in sources, you can simply use their short names (csv,json, parquet, jdbc, text e.t.c).
In this example, it shows how to read a single CSV file “people.csv” into DataFrame as well as how to use your own defined schema when read file into DataFrame.


In [5]:
# Read CSV file people.csv
df = spark.read.format('csv') \
                .option("inferSchema","true") \
                .option("header","true") \
                .load("/content/drive/MyDrive/Colab Notebooks/data/BEAD_Rebu_Drivers.csv")

# Show result
df.show()

# Print schema
df.printSchema()


+---+-----------------+-----------+-------------+------+
|Sno|       DriverName|DriverPhone|TaxiIDDriving|Rating|
+---+-----------------+-----------+-------------+------+
|  1|Georgiana Iverson|   38587202|          209|   1.7|
|  2|   Ewell Rolstone|   88675586|          243|   4.9|
|  3|    Pedro Thacker|   94452422|          197|   2.6|
|  4|     Winn Kellard|   81521505|          456|   4.8|
|  5|   Ermin Trounson|   21644415|          372|   4.1|
|  6| Weylin Bernhardi|   89930924|          397|   4.7|
|  7|  Giuseppe Manton|   78503208|          463|   3.3|
|  8| Friedrich De'Ath|   64901517|          264|   4.4|
|  9|  Lauraine Galton|   28736147|          367|   4.7|
| 10|   Debra Willeman|   97189395|          277|   4.6|
| 11| Francene Gavriel|   88137354|           32|   4.9|
| 12|     Eyde Brosini|   34871916|          386|   4.2|
| 13|  Orelia Woolfoot|   28037658|          329|   4.1|
| 14|Christi Middleton|   97577827|          453|   4.8|
| 15|     Jamey Cecely|   48329

DataFrame[]

In [7]:
driversCount = df.count()
df.show(driversCount, False)

+---+-----------------------+-----------+-------------+------+
|Sno|DriverName             |DriverPhone|TaxiIDDriving|Rating|
+---+-----------------------+-----------+-------------+------+
|1  |Georgiana Iverson      |38587202   |209          |1.7   |
|2  |Ewell Rolstone         |88675586   |243          |4.9   |
|3  |Pedro Thacker          |94452422   |197          |2.6   |
|4  |Winn Kellard           |81521505   |456          |4.8   |
|5  |Ermin Trounson         |21644415   |372          |4.1   |
|6  |Weylin Bernhardi       |89930924   |397          |4.7   |
|7  |Giuseppe Manton        |78503208   |463          |3.3   |
|8  |Friedrich De'Ath       |64901517   |264          |4.4   |
|9  |Lauraine Galton        |28736147   |367          |4.7   |
|10 |Debra Willeman         |97189395   |277          |4.6   |
|11 |Francene Gavriel       |88137354   |32           |4.9   |
|12 |Eyde Brosini           |34871916   |386          |4.2   |
|13 |Orelia Woolfoot        |28037658   |329          |

In [None]:

# Write DataFrame to CSV
df.write.csv("people.csv", header=True, mode="overwrite")
df.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



# Read Taxi JSON
PySpark, a Python API for Apache Spark, handles JSON data efficiently using its powerful data processing capabilities. Some capabilities are:
1. Reading / Writing from/to DataFrame/RDD
2. Handling Nested JSON
3. Performance tuning via caching, partition control and cluster management.

In [8]:
df = spark.read .option("inferSchema","true") \
                .option("header","true") \
                .option("multiline","True") \
                .json("/content/drive/MyDrive/Colab Notebooks/data/BEAD_Rebu_TaxiCabs.json")
df.show()
df.printSchema()



+--------+---------+------+--------------------+----------+---------------------+--------+
|  TMDTID|TaxiColor|TaxiID|       TaxiMakeModel|TaxiNumber|TaxiPassengerCapacity|TaxiType|
+--------+---------+------+--------------------+----------+---------------------+--------+
|TMA73889|   Yellow|     1|      Toyota Carolla|   SHZ2770|                    4|Standard|
|TMC04591|    Green|     2|        Suzuki Swift|   SHY4378|                    4|Mini Cab|
|TMA12020|     Blue|     3|        Toyota Prius|   SHX6464|                    4|Standard|
|TMB02825|   Silver|     4| Toyota Camry Hybrid|   SHX4872|                    4| Premier|
|TMC12882|    Green|     5|        Suzuki Swift|   SHX2609|                    4|Mini Cab|
|TMC45713|    Cream|     6|      Mercedes Viano|   SHY9111|                    7|Maxi Cab|
|TMA78092|     Blue|     7|Hyundai Ioniq Hybrid|   SHX5867|                    4|Standard|
|TMB17549|     Blue|     8|         Hyundai i40|   SHY6907|                    4|Standard|

In [9]:
df.sort('TaxiNumber').show()

+--------+---------+------+--------------------+----------+---------------------+--------+
|  TMDTID|TaxiColor|TaxiID|       TaxiMakeModel|TaxiNumber|TaxiPassengerCapacity|TaxiType|
+--------+---------+------+--------------------+----------+---------------------+--------+
|TMB18597|    White|    59|       Mercedes Benz|   SHX0106|                    4|Limosine|
|TMB76389|   Yellow|   464|Hyundai Ioniq Hybrid|   SHX0278|                    4|Standard|
|TMA22172|   Yellow|   352|Hyundai Ioniq Hybrid|   SHX0354|                    4|Standard|
|TMB30762|     Grey|   236| Toyota Camry Hybrid|   SHX0399|                    4| Premier|
|TMA25750|     Grey|   188| Toyota Camry Hybrid|   SHX0541|                    4| Premier|
|TMB51371|    White|    32|       Mercedes Benz|   SHX0588|                    4|Limosine|
|TMC74051|    Black|   378|     Toyota Vellfire|   SHX0613|                    6|Maxi Cab|
|TMA27837|     Blue|   460|         Hyundai i40|   SHX0694|                    4|Standard|

df.sort('TaxiNumber').show()

In [15]:
df.where("TaxiType = 'Premier' AND TaxiPassengerCapacity= 4").show()

+--------+---------+------+-------------------+----------+---------------------+--------+
|  TMDTID|TaxiColor|TaxiID|      TaxiMakeModel|TaxiNumber|TaxiPassengerCapacity|TaxiType|
+--------+---------+------+-------------------+----------+---------------------+--------+
|TMB02825|   Silver|     4|Toyota Camry Hybrid|   SHX4872|                    4| Premier|
|TMB67907|   Silver|    16|Toyota Camry Hybrid|   SHY9209|                    4| Premier|
|TMC66738|     Grey|    18|Toyota Camry Hybrid|   SHY2915|                    4| Premier|
|TMC10802|     Grey|    23|Toyota Camry Hybrid|   SHY9485|                    4| Premier|
|TMA12261|     Grey|    37|Toyota Camry Hybrid|   SHY7838|                    4| Premier|
|TMA41233|   Silver|    43|Toyota Camry Hybrid|   SHX5153|                    4| Premier|
|TMA79747|     Grey|    57|Toyota Camry Hybrid|   SHY3876|                    4| Premier|
|TMB16199|   Silver|    60|Toyota Camry Hybrid|   SHZ3746|                    4| Premier|
|TMC67061|

Retrieve all 4 seaters Premier Taxi

In [None]:
df.select('TaxiNumber', 'TaxiType', 'TaxiColor').where("TaxiType = 'Premier'").show()

# Read Trip DATA CSV

# Parquet
PySpark handles Parquet files with a focus on efficiency and scalability, leveraging the columnar storage format of Parquet for optimal performance. Brief capablities include:
1. Reading/Writing from/to DataFrame/RDD
2. Performance tuning via compression, optimization, partition control and cluster management.

In [None]:
# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.format("parquet").mode("overwrite").save("people.parquet")


In [None]:
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()



+------+
|  name|
+------+
|Justin|
+------+



#PyArrow
PySpark handles PyArrow to facilitate efficient data interchange between Spark and Python. PyArrow, part of the Apache Arrow project, provides a columnar memory format that enhances data processing performance, especially for large datasets.

In [None]:
import pyarrow.csv as pv
import pyarrow.parquet as pq
# read hdb resale price
hdb_table = pv.read_csv("/content/drive/MyDrive/data/DataFormat/resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.csv")
# convert the CSV file to a Parquet file
pq.write_table(hdb_table,'resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.parquet')
hdb_parquet = pq.ParquetFile('resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.parquet')
# inspect the parquet metadata
print(hdb_parquet.metadata)
# inspect the parquet row group metadata
print(hdb_parquet.metadata.row_group(0))
# inspect the column chunk metadata
print(hdb_parquet.metadata.row_group(0).column(9).statistics)



<pyarrow._parquet.FileMetaData object at 0x7e1bff756e30>
  created_by: parquet-cpp-arrow version 14.0.2
  num_columns: 10
  num_rows: 52203
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2079
<pyarrow._parquet.RowGroupMetaData object at 0x7e1c2dadaa20>
  num_columns: 10
  num_rows: 52203
  total_byte_size: 431095
<pyarrow._parquet.Statistics object at 0x7e1bff9d3740>
  has_min_max: True
  min: 195000.0
  max: 1088888.0
  null_count: 0
  distinct_count: None
  num_values: 52203
  physical_type: DOUBLE
  logical_type: None
  converted_type (legacy): NONE


In [None]:
# Convert Spark to Pandas
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_df = df.toPandas()
pandas_df.head()

Unnamed: 0,age,job,name
0,,,Michael
1,30.0,developer,Andy
2,19.0,,Justin


In [None]:
#Convert Pandas to Spark
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()

+----+---------+-------+
| age|      job|   name|
+----+---------+-------+
|NULL|     NULL|Michael|
|30.0|developer|   Andy|
|19.0|     NULL| Justin|
+----+---------+-------+



====End of Workshop===

### For Printing

In [None]:
def colab2pdf():
    ENABLE=True # @param {type:"boolean"}
    if ENABLE:
        import os, datetime, json, locale, pathlib, urllib, requests, werkzeug, nbformat, google, yaml, warnings
        locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
        NAME = pathlib.Path(werkzeug.utils.secure_filename(urllib.parse.unquote(requests.get(f"http://{os.environ['COLAB_JUPYTER_IP']}:{os.environ['KMP_TARGET_PORT']}/api/sessions").json()[0]["name"])))
        TEMP = pathlib.Path("/content/pdfs") / f"{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}_{NAME.stem}"; TEMP.mkdir(parents=True, exist_ok=True)
        NB = [cell for cell in nbformat.reads(json.dumps(google.colab._message.blocking_request("get_ipynb", timeout_sec=30)["ipynb"]), as_version=4).cells if "--Colab2PDF" not in cell.source]
        warnings.filterwarnings('ignore', category=nbformat.validator.MissingIDFieldWarning)
        with (TEMP / f"{NAME.stem}.ipynb").open("w", encoding="utf-8") as nb_copy: nbformat.write(nbformat.v4.new_notebook(cells=NB or [nbformat.v4.new_code_cell("#")]), nb_copy)
        if not pathlib.Path("/usr/local/bin/quarto").exists():
            !wget -q "https://quarto.org/download/latest/quarto-linux-amd64.deb" -P {TEMP} && dpkg -i {TEMP}/quarto-linux-amd64.deb > /dev/null && quarto install tinytex --update-path --quiet
        with (TEMP / "config.yml").open("w", encoding="utf-8") as file: yaml.dump({'include-in-header': [{"text": r"\usepackage{fvextra}\DefineVerbatimEnvironment{Highlighting}{Verbatim}{breaksymbolleft={},showspaces=false,showtabs=false,breaklines,breakanywhere,commandchars=\\\{\}}"}],'include-before-body': [{"text": r"\DefineVerbatimEnvironment{verbatim}{Verbatim}{breaksymbolleft={},showspaces=false,showtabs=false,breaklines}"}]}, file)
        !quarto render {TEMP}/{NAME.stem}.ipynb --metadata-file={TEMP}/config.yml --to pdf -M latex-auto-install -M margin-top=1in -M margin-bottom=1in -M margin-left=1in -M margin-right=1in --quiet
        google.colab.files.download(str(TEMP / f"{NAME.stem}.pdf"))
colab2pdf()

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>