# Data Processing - Overview

As part of this section we will get an overview about Data Processing using Spark with Python.
* Pre-requisites and Objectives
* Starting Spark Context
* Overview of Spark read APIs
* Understand airlines data
* Inferring Schema
* Previewing airlines data
* Overview of Data Frame APIs
* Overview of Functions
* Overview of Spark Write APIs
* Reorganizing airlines data
* Previewing reorganized data
* Analyze and Understand Data
* Conclusion

## Pre-requisites and Objectives

Let us understand prerequisites before getting into the topics related to this section.
* Good understanding of Data Processing using Python.
* Data Processing Life Cycle
  * Reading Data from files
  * Processing Data using APIs
  * Writing Processed Data back to files
* We can also use Databases as sources and sinks. It will be covered at a later point in time.
* We can also read data in streaming fashion which is out of the scope of this course.

We will get an overview of the Data Processing Life Cycle by the end of the section.
* Read airlines data from the file.
* Preview the schema and data to understand the characteristics of the data.
* Get an overview of Data Frame APIs as well as functions used to process the data.
* Check if there are any duplicates in the data.
* Get an overview of how to write data in Data Frames to Files using File Formats such as Parquet using Compression.
* Reorganize the data by month with different file format and using partitioning strategy.
* We will deep dive into Data Frame APIs to process the data in subsequent modules.

## Starting Spark Context

Let us start Spark Context using SparkSession.

* `SparkSession` is a class that is part of `pyspark.sql` package.
* It is a wrapper on top of Spark Context.
* When Spark application is submitted using `spark-submit` or `spark-shell` or `pyspark`, a web service called as Spark Context will be started.
* Spark Context maintains the context of all the jobs that are submitted until it is killed.
* `SparkSession` is nothing but wrapper on top of Spark Context.
* We need to first create SparkSession object with any name. But typically we use `spark`. Once it is created, several APIs will be exposed including `read`.
* We need to at least set Application Name and also specify the execution mode in which Spark Context should run while creating `SparkSession` object.
* We can use `appName` to specify name for the application and `master` to specify the execution mode.
* Below is the sample code snippet which will start the Spark Session object for us.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Data Processing - Overview'). \
    master('yarn'). \
    getOrCreate()

In [2]:
spark

## Overview of Spark read APIs

Let us get the overview of Spark read APIs to read files of different formats.

* `spark` has a bunch of APIs to read data from files of different formats.
* All APIs are exposed under `spark.read`
  * `text` - to read single column data from text files as well as reading each of the whole text file as one record.
  * `csv`- to read text files with delimiters. Default is a comma, but we can use other delimiters as well.
  * `json` - to read data from JSON files
  * `orc` - to read data from ORC files
  * `parquet` - to read data from Parquet files.
  * We can also read data from other file formats by plugging in and by using `spark.read.format`
* We can also pass options based on the file formats.
  * `inferSchema` - to infer the data types of the columns based on the data.
  * `header` - to use header to get the column names in case of text files.
  * `schema` - to explicitly specify the schema.
* We can get the help on APIs like `spark.read.csv` using `help(spark.read.csv)`.
* Reading delimited data from text files.

In [3]:
spark. \
    read. \
    csv('/public/retail_db/orders',
        header=True,
        schema='''
            order_id INT, 
            order_date STRING, 
            order_customer_id INT, 
            order_status STRING
        '''
       ). \
    show()

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
|       6|2013-07-25 00:00:...|             7130|       COMPLETE|
|       7|2013-07-25 00:00:...|             4530|       COMPLETE|
|       8|2013-07-25 00:00:...|             2911|     PROCESSING|
|       9|2013-07-25 00:00:...|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|             1837|         CLOSED|
|      13|2013-07-25 00:00:...|             9149|PENDING_PAYMENT|
|      14|

* Reading JSON data from text files. We can infer schema from the data as each JSON object contain both column name and value.
* Example for JSON

```json
{
    "order_id": 1, 
    "order_date": "2013-07-25 00:00:00.0", 
    "order_customer_id": 12345, 
    "order_status": "COMPLETE"
}
```

In [4]:
spark. \
    read. \
    json('/public/retail_db_json/orders'). \
    show()

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
|             7130|2013-07-25 00:00:...|       6|       COMPLETE|
|             4530|2013-07-25 00:00:...|       7|       COMPLETE|
|             2911|2013-07-25 00:00:...|       8|     PROCESSING|
|             5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|             5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|              918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|             1837|2013-07-25 00:00:...|      12|         CLOSED|
|         

## Understand airlines data
Let us read one of the files and understand more about the data to determine right API with right options to process data later.
* Our airlines data is in text file format.
* We can use `spark.read.text` on one of the files to preview the data and understand the following
  * Whether header is present in files or not.
  * Field Delimiter that is being used.
* Once we determine details about header and field delimiter we can use `spark.read.csv` with appropriate options to read the data.

In [5]:
airlines = spark.read. \
    text("/public/airlines_all/airlines/part-00000")

In [6]:
type(airlines)

pyspark.sql.dataframe.DataFrame

In [7]:
help(airlines.show)

Help on method show in module pyspark.sql.dataframe:

show(n=20, truncate=True, vertical=False) method of pyspark.sql.dataframe.DataFrame instance
    Prints the first ``n`` rows to the console.
    
    :param n: Number of rows to show.
    :param truncate: If set to ``True``, truncate strings longer than 20 chars by default.
        If set to a number greater than one, truncates long strings to length ``truncate``
        and align cells right.
    :param vertical: If set to ``True``, print output rows vertically (one line
        per column value).
    
    >>> df
    DataFrame[age: int, name: string]
    >>> df.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    >>> df.show(truncate=3)
    +---+----+
    |age|name|
    +---+----+
    |  2| Ali|
    |  5| Bob|
    +---+----+
    >>> df.show(vertical=True)
    -RECORD 0-----
     age  | 2
     name | Alice
    -RECORD 1-----
     age  | 5
     name | Bob
    
    .. versionadded:

In [8]:
airlines.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Year,Month,Dayo

In [9]:
help(spark.read.text)

Help on method text in module pyspark.sql.readwriter:

text(paths, wholetext=False, lineSep=None) method of pyspark.sql.readwriter.DataFrameReader instance
    Loads text files and returns a :class:`DataFrame` whose schema starts with a
    string column named "value", and followed by partitioned columns if there
    are any.
    
    By default, each line in the text file is a new row in the resulting DataFrame.
    
    :param paths: string, or list of strings, for input path(s).
    :param wholetext: if true, read each file from input path(s) as a single row.
    :param lineSep: defines the line separator that should be used for parsing. If None is
                    set, it covers all ``\r``, ``\r\n`` and ``\n``.
    
    >>> df = spark.read.text('python/test_support/sql/text-test.txt')
    >>> df.collect()
    [Row(value='hello'), Row(value='this')]
    >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
    >>> df.collect()
    [Row(value='hello\nth



* Data have header and each field is delimited by a comma.

## Inferring Schema

Let us understand how we can quickly get schema using one file and apply on other files.
* We can pass the file name pattern to `spark.read.csv` and read all the data in files under **hdfs://public/airlines_all/airlines** into Data Frame.
* We can use options such as `header` and `inferSchema` to assign names and data types.
* However `inferSchema` will end up going through the entire data to assign schema. We can use samplingRatio to process fraction of data and then infer the schema.
* In case if the data in all the files have similar structure, we should be able to get the schema using one file and then apply it on others.
* In our airlines data schema is consistent across all the files and hence we should be able to get the schema by going through one file and apply on the entire dataset.

In [10]:
airlines_part_00000 = spark.read. \
    csv("/public/airlines_all/airlines/part-00000",
        header=True,
        inferSchema=True
       )

In [11]:
type(airlines_part_00000)

pyspark.sql.dataframe.DataFrame

In [12]:
airlines_part_00000.show(truncate=False)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|1987|10   |14  

In [13]:
airlines_part_00000.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [14]:
airlines_schema = spark.read. \
    csv("/public/airlines_all/airlines/part-00000",
        header=True,
        inferSchema=True
       ). \
    schema

In [15]:
type(airlines_schema)

pyspark.sql.types.StructType

In [16]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None, emptyValue=None) method of pyspark.sql.readwriter.DataFrameReader instance
    Loads a CSV file and returns the result as a  :class:`DataFrame`.
    
    This function will go through the input once to determine the input schema if
    ``inferSchema`` is enabled. To avoid going through the entire data once, disable
    ``inferSchema`` option or specify the schema explicitly using ``schema``.
    
    :param path: string, or list of strings, for input 

In [18]:
airlines = spark.read. \
    schema(airlines_schema). \
    csv("/public/airlines_all/airlines/part*",
        samplingRatio=0.1,
        header=True
       )

In [19]:
help(airlines)

Help on DataFrame in module pyspark.sql.dataframe object:

class DataFrame(builtins.object)
 |  A distributed collection of data grouped into named columns.
 |  
 |  A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
 |  and can be created using various functions in :class:`SparkSession`::
 |  
 |      people = spark.read.parquet("...")
 |  
 |  Once created, it can be manipulated using the various domain-specific-language
 |  (DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
 |  
 |  To select a column from the :class:`DataFrame`, use the apply method::
 |  
 |      ageCol = people.age
 |  
 |  A more concrete example::
 |  
 |      # To create DataFrame using SparkSession
 |      people = spark.read.parquet("...")
 |      department = spark.read.parquet("...")
 |  
 |      people.filter(people.age > 30).join(department, people.deptId == department.id) \
 |        .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
 |  
 |  .. ve

In [None]:
airlines.count()

## Previewing airlines data
Let us preview the airlines data to understand more about it.
* As we have too many files, we will just process one file and preview the data.
* File Name: **hdfs://public/airlines_all/airlines/part-00000**
* `spark.read.csv` will create a variable of type Data Frame.



In [20]:
airlines_schema = spark.read. \
    csv("/public/airlines_all/airlines/part-00000",
        header=True,
        inferSchema=True
       ). \
    schema

In [21]:
airlines = spark.read. \
    schema(airlines_schema). \
    csv("/public/airlines_all/airlines/part*",
        header=True
       )

A Data Frame will have structure or schema.

* We can print the schema using `airlines.printSchema()`
* We can preview the data using `airlines.show()`. By default it shows 20 records and some of the column values might be truncated for readability purpose.
* We can review the details of show by using `help(airlines.show)`
* We can pass custom number of records and say `truncate=False` to show complete information of all the records requested. It will facilitate us to preview all columns with desired number of records.




In [22]:
airlines.show(10, truncate=False)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|2001|8    |3   

* We can get the number of records or rows in a Data Frame using `airlines.count()`
* In Databricks Notebook, we can use `display` to preview the data using Visualization feature
* We can perform all kinds of standard transformations on our data. We need to have good knowledge of functions on Data Frames as well as functions on columns to apply all standard transformations.
* Let us also validate if there are duplicates in our data, if yes we will remove duplicates while reorganizing the data later.


In [23]:
airlines_schema = spark.read. \
    csv("/public/airlines_all/airlines/part-00000",
        header=True,
        inferSchema=True
       ). \
    schema

In [24]:
airlines = spark.read. \
    schema(airlines_schema). \
    csv("/public/airlines_all/airlines/part-0000*",
        header=True
       )

In [25]:
airlines.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [26]:
airlines.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|1987|   12|    

In [None]:
airlines.show(100, truncate=False)

In [27]:
airlines.count()

6489231

In [28]:
airlines.distinct().count()

6489146

## Overview of Data Frame APIs

Let us get an overview of Data Frame APIs to process data in Data Frames.
* Row Level Transformations or Projection of Data can be done using `select`, `selectExpr`, `withColumn`, `drop` on Data Frame.
* We typically apply functions from `pyspark.sql.functions` on columns using `select` and `withColumn`
* Filtering is typically done either by using `filter` or `where` on Data Frame.
* We can pass the condition to `filter` or `where` either by using SQL Style or Programming Language Style.
* Global Aggregations can be performed directly on the Data Frame.
* By Key or Grouping Aggregations are typically performed using `groupBy` and then aggregate functions using `agg`
* We can sort the data in Data Frame using `sort` or `orderBy`
* We will talk about Window Functions later. We can use use Window Functions for some advanced Aggregations and Ranking.

### Tasks

Let us understand how to project the data using different options such as `select`, `selectExpr`, `withColumn`, `drop.`

* Create Dataframe **employees** using Collection

In [29]:
employees = [(1, "Scott", "Tiger", 1000.0, "united states"),
             (2, "Henry", "Ford", 1250.0, "India"),
             (3, "Nick", "Junior", 750.0, "united KINGDOM"),
             (4, "Bill", "Gomes", 1500.0, "AUSTRALIA")
            ]

In [30]:
employeesDF = spark. \
    createDataFrame(employees,
                    schema="""employee_id INT, first_name STRING, 
                    last_name STRING, salary FLOAT, nationality STRING"""
                   )

In [31]:
employeesDF.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- salary: float (nullable = true)
 |-- nationality: string (nullable = true)



In [32]:
employeesDF.show()

+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary|   nationality|
+-----------+----------+---------+------+--------------+
|          1|     Scott|    Tiger|1000.0| united states|
|          2|     Henry|     Ford|1250.0|         India|
|          3|      Nick|   Junior| 750.0|united KINGDOM|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|
+-----------+----------+---------+------+--------------+



* Project employee first name and last name.


In [33]:
employeesDF. \
    select("first_name", "last_name"). \
    show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|     Scott|    Tiger|
|     Henry|     Ford|
|      Nick|   Junior|
|      Bill|    Gomes|
+----------+---------+



* Project all the fields except for Nationality

In [34]:
employeesDF. \
    drop("nationality"). \
    show()

+-----------+----------+---------+------+
|employee_id|first_name|last_name|salary|
+-----------+----------+---------+------+
|          1|     Scott|    Tiger|1000.0|
|          2|     Henry|     Ford|1250.0|
|          3|      Nick|   Junior| 750.0|
|          4|      Bill|    Gomes|1500.0|
+-----------+----------+---------+------+



**We will explore most of the APIs to process data in Data Frames as we get into the data processing at a later point in time**

## Overview of Functions

Let us get an overview of different functions that are available to process data in columns.
* While Data Frame APIs work on the Data Frame, at times we might want to apply functions on column values.
* Functions to process column values are available under `pyspark.sql.functions`. These are typically used in select or withColumn on top of Data Frame.
* There are approximately 300 pre-defined functions available for us.
* Some of the important functions can be broadly categorized into String Manipulation, Date Manipulation, Numeric Functions and Aggregate Functions.
* String Manipulation Functions
  * Concatenating Strings - `concat`
  * Getting Length - `length`
  * Trimming Strings - `trim`,` rtrim`, `ltrim`
  * Padding Strings - `lpad`, `rpad`
  * Extracting Strings - `split`, `substring`
* Date Manipulation Functions
  * Date Arithmetic - `date_add`, `date_sub`, `datediff`, `add_months`
  * Date Extraction - `dayofmonth`, `month`, `year`
  * Get beginning period - `trunc`, `date_trunc`
* Numeric Functions - `abs`, `greatest`
* Aggregate Functions - `sum`, `min`, `max`

### Tasks
Let us perform a task to understand how functions are typically used.

* Project full name by concatenating first name and last name along with other fields excluding first name and last name.

In [35]:
from pyspark.sql.functions import lit, concat

employeesDF. \
    withColumn("full_name", concat("first_name", lit(", "), "last_name")). \
    drop("first_name", "last_name"). \
    show()

+-----------+------+--------------+------------+
|employee_id|salary|   nationality|   full_name|
+-----------+------+--------------+------------+
|          1|1000.0| united states|Scott, Tiger|
|          2|1250.0|         India| Henry, Ford|
|          3| 750.0|united KINGDOM|Nick, Junior|
|          4|1500.0|     AUSTRALIA| Bill, Gomes|
+-----------+------+--------------+------------+



In [None]:
employeesDF. \
    select("employee_id",
           concat("first_name", lit(", "), "last_name").alias("full_name"),
           "salary",
           "nationality"
          ). \
    show()

# this doesn't work

In [None]:
employeesDF. \
    selectExpr("employee_id",
               "concat(first_name, ', ', last_name) AS full_name",
               "salary",
               "nationality"
              ). \
    show()

# this doesn't work

**We will explore most of the functions as we get into the data processing at a later point in time**

## Overview of Spark Write APIs

Let us understand how we can write Data Frames to different file formats.
* All the batch write APIs are grouped under write which is exposed to Data Frame objects.
* All APIs are exposed under spark.read
  * `text` - to write single column data to text files.
  * `csv` - to write to text files with delimiters. Default is a comma, but we can use other delimiters as well.
  * `json` - to write data to JSON files
  * `orc` - to write data to ORC files
  * `parquet` - to write data to Parquet files.
* We can also write data to other file formats by plugging in and by using `write.format`, for example **avro**
* We can use options based on the type using which we are writing the Data Frame to.
  * `compression` - Compression codec (`gzip`, `snappy` etc)
  * `sep` - to specify delimiters while writing into text files using **csv**
* We can `overwrite` the directories or `append` to existing directories using `mode`
* Create copy of orders data in **parquet** file format with no compression. If the folder already exists overwrite it. Target Location: **/user/[YOUR_USER_NAME]/retail_db/orders**
* When you pass options, if there are typos then options will be ignored rather than failing. Be careful and make sure that output is validated.
* By default the number of files in the output directory is equal to number of tasks that are used to process the data in the last stage. However, we might want to control number of files so that we don't run into too many small files issue.
* We can control number of files by using `coalesce`. It has to be invoked on top of Data Frame before invoking `write`.

In [38]:
orders = spark. \
    read. \
    csv('/public/retail_db/orders',
        header=True,
        schema='''
            order_id INT, 
            order_date STRING, 
            order_customer_id INT, 
            order_status STRING
        '''
       )

In [None]:
orders. \
    write. \
    parquet('/user/training/retail_db/orders', 
            mode='overwrite', 
            compression='none'
           )

# this doesn't work

In [None]:
# Alternative approach - using option
orders. \
    write. \
    mode('overwrite'). \
    option('compression', 'none'). \
    parquet('/user/training/retail_db/orders')

# this doesn't work

In [None]:
# Alternative approach - using format
orders. \
    write. \
    mode('overwrite'). \
    option('compression', 'none'). \
    format('parquet'). \
    save('/user/training/retail_db/orders')

In [41]:
%%sh

hdfs dfs -ls /user/training/retail_db/orders

# File extension should not contain compression algorithms such as snappy.

ls: Permission denied: user=itv008358, access=EXECUTE, inode="/user/training":training:supergroup:drwx------


CalledProcessError: Command 'b'\nhdfs dfs -ls /user/training/retail_db/orders\n\n# File extension should not contain compression algorithms such as snappy.\n'' returned non-zero exit status 1.

* Read order_items data from **/public/retail_db_json/order_items** and write it to pipe delimited files with gzip compression. Target Location: **/user/[YOUR_USER_NAME]/retail_db/order_items**. Make sure to validate.
* Ignore the error if the target location already exists. Also make sure to write into only one file. We can use `coalesce` for it. 

**`coalesce` will be covered in detail at a later point in time**

In [42]:
order_items = spark. \
    read. \
    json('/public/retail_db_json/order_items')

Py4JJavaError: An error occurred while calling o393.json.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1486)
	at org.apache.spark.sql.execution.datasources.text.TextFileFormat.buildReader(TextFileFormat.scala:106)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:310)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:306)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:103)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:98)
	at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:64)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:193)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:411)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [43]:
# Using format
order_items. \
    coalesce(1). \
    write. \
    mode('ignore'). \
    option('compression', 'gzip'). \
    option('sep', '|'). \
    format('csv'). \
    save('/user/training/retail_db/order_items')

NameError: name 'order_items' is not defined

In [None]:
# Alternative approach - using keyword arguments
order_items. \
    coalesce(1). \
    write. \
    csv('/user/training/retail_db/order_items',
        sep='|',
        mode='ignore',
        compression='gzip'
       )

In [None]:
%%sh

hdfs dfs -ls /user/training/retail_db/order_items

## Reorganizing airlines data

Let us reorganize our airlines data to fewer files where data is compressed and also partitioned by Month.
* We have ~1920 files of ~64MB Size.
* Data is in the range of 1987 October and 2008 December (255 months)
* By default it uses ~1920 threads to process the data and it might end up with too many small files. We can avoid that by using repartition and then partition by the month.
* Here are the steps we are going to follow to partition by flight month and save the data to /user/[YOUR_USER_NAME]/airlines.
  * Read one file first and get the schema.
  * Read the entire data by applying the schema from the previous step.
  * Add additional column flightmonth using withColumn by using lpad on month column and concat functions. We need to do this as the month in our data set is of type integer and we want to pad with 0 for months till september to format it into YYYYMM.
  * Repartition the data into 255 based on the number of months using flightmonth
  * Partition the data by partitionBy while writing the data to the target location.
  * We will use parquet file format which will automatically compresses data using Snappy algorithm.
 
**This process will take time, once it is done we will review the target location to which data is copied by partitioning using month**

In [44]:
from pyspark.sql import SparkSession

# spark.stop()
spark = SparkSession. \
    builder. \
    config('spark.dynamicAllocation.enabled', 'false'). \
    config('spark.executor.instances', 40). \
    appName('Data Processing - Overview'). \
    master('yarn'). \
    getOrCreate()

In [45]:
spark

In [46]:
from pyspark.sql.functions import concat, lpad

airlines_schema = spark.read. \
    csv('/public/airlines_all/airlines/part-00000',
        header=True,
        inferSchema=True
       ). \
    schema

Py4JJavaError: An error occurred while calling o460.csv.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1486)
	at org.apache.spark.sql.execution.datasources.text.TextFileFormat.buildReader(TextFileFormat.scala:106)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:310)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:306)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:123)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:236)
	at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:68)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:63)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:193)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:638)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
airlines = spark.read. \
    schema(airlines_schema). \
    csv('/public/airlines_all/airlines/part*',
        header=True
       )

In [None]:
airlines.printSchema()

In [None]:
airlines.show()

In [None]:
help(airlines.write.parquet)

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "255")
airlines. \
    distinct(). \
    withColumn('flightmonth', concat('year', lpad('month', 2, '0'))). \
    repartition(255, 'flightmonth'). \
    write. \
    mode('overwrite'). \
    partitionBy('flightmonth'). \
    format('parquet'). \
    save('/user/training/airlines-part')

## Previewing reorganized data
Let us preview the data using reorganized data.
* We will use new location going forward - **/public/airlines_all/airlines-part**. Data is already copied into that location.
* We have partitioned data by month and stored in that location.
* Instead of using complete data set we will read the data from one partition **/public/airlines_all/airlines-part/flightmonth=200801**
* First let us create a DataFrame object by using `spark.read.parquet("/public/airlines_all/airlines-part/flightmonth=200801")` - let's say airlines. 
* We can get the schema of the DataFrame using `airlines.printSchema()`
* Use `airlines.show()` or `airlines.show(100, truncate=False)`  to preview the data.
* We can also use `display(airlines)` to get airlines data in tabular format as part of Databricks Notebook.
* We can also use `airlines.describe().show()` to get some statistics about the Data Frame and `airlines.count()` to get the number of records in the DataFrame.

## Analyze and Understand Data
Let us analyze and understand more about the data in detail using data of 2008 January.
* First let us read the data for the month of 2008 January.

In [47]:
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

airlines = spark. \
    read. \
    parquet(airlines_path)

Py4JJavaError: An error occurred while calling o516.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2359)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:596)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:193)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:664)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
airlines.count()

In [None]:
airlines.printSchema()

* Get number of records - `airlines.count()`
* Go through the list of columns and understand the purpose of them.
  * Year
  * Month
  * DayOfMonth
  * CRSDepTime - Scheduled Departure Time
  * DepTime - Actual Departure Time.
  * DepDelay - Departure Delay in Minutes
  * CRSArrTime - Scheduled Arrival Time
  * ArrTime - Actual Arrival Time.
  * ArrDelay - Arrival Delay in Minutes.
  * UniqueCarrier - Carrier or Airlines
  * FlightNum - Flight Number
  * Distance - Distance between Origin and Destination
  * IsDepDelayed - this is set to yes for those flights where departure is delayed.
  * IsArrDelayed -- this is set to yes for those flights where arrival is delayed.
* Get number of unique origins

In [None]:
airlines. \
    select("Origin"). \
    distinct(). \
    count()

* Get number of unique destinations

In [None]:
airlines. \
    select("Dest"). \
    distinct(). \
    count()

* Get all unique carriers

In [None]:
airlines. \
    select('UniqueCarrier'). \
    distinct(). \
    show()

## Conclusion
Let us recap about key takeaways from this module.
* APIs to read the data from files into Data Frame.
* Previewing Schema and the data in Data Frame.
* Overview of Data Frame APIs and Functions
* Writing data from Data Frame into Files
* Reorganizing the airlines data by month
* Simple APIs to analyze the data.
Now it is time for us to deep dive into APIs to perform all the standard transformations as part of Data Processing.
