# Spark DataFrame

## Oberview

The previous section showed you how to read a CSV file into Spark. The result is stored in a data structured called
DataFrame. A Spark DataFrame is a like a distributed in-memory table [1]. As a table like structure it has
columns and each column has a specific data type.

DataFrames play a key role in developing Spark applications. In this section we will go over the core elements you need
to know in order to work efficiently with them.

## Spark DataFrame

A Spark DataFrame is a like a distributed in-memory table [1]. As a table like structure it has
columns and each column has a specific data type. DataFrames are immutable and this allows Spark to keep a lineage of all the
transformations applied on them. A DataFrame has a certain schema [1]. A schema defines the column names and the associated data types.
When reading data from a specific source we can either let Spark infer the schema, just like we did in the previous section, or explicitly
specifying the schema. The latter approach has two distinct benefits [1]:

- Inferring data can be tricky and hence time consuming; Spark needs to creat a separate job, read a large portion of the data and then infer the schema
- Providing the schema means we can infer quickly if the data doesn't match the proposed schema.

Let's see how an application can provide the schema of a dataset in Spark. There are two wasy to do so:

- Employ a data definition language (DDL) string
- Define it programmatically

The script below shows the first approach

```
"""Loads a CSV file into Spark
"""

"""Convert a csv file to parquet format.
This application is meant to be submitted on
Spark for execution

"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pathlib import Path
import sys
import numpy as np

APP_NAME = "LOAD_CSV_FILE_TO_SPARK"

if __name__ == '__main__':

    if len(sys.argv) != 2:
        print("Usage: filename <file>", file=sys.stderr)

    # get a spark session
    spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

    # read the filename from the commandline

    # where is the file to read
    filename = Path(sys.argv[1])

    print(f"Loading file {filename}")

    # set the schema using DDL
    schema = "`OBJECTID` INT, `TRIPTYPE` INT, "\
             "`PROVIDERNAME` STRING, `FAREAMOUNT` FLOAT,"\
             "`GRATUITYAMOUNT` FLOAT, `SURCHARGEAMOUNT` FLOAT, "\
             "`EXTRAFAREAMOUNT` FLOAT, `TOLLAMOUNT` FLOAT, "\
             "`TOTALAMOUN` FLOAT, `PAYMENTTYPE` STRING,"\
             "`ORIGINCITY` STRING, `ORIGINSTATE` STRING,"\
             "`ORIGINZIP` STRING, `DESTINATIONCITY` STRING,"\
             "`DESTINATIONSTATE` STRING, `DESTINATIONZIP` STRING,"\
             "`MILEAGE` STRING, `DURATION` STRING,"\
             "`ORIGIN_BLOCK_LATITUDE` STRING, `ORIGIN_BLOCK_LONGITUDE` STRING,"\
             "`ORIGIN_BLOCKNAME` STRING, `DESTINATION_BLOCK_LATITUDE` STRING,"\
             "`DESTINATION_BLOCK_LONGITUDE` STRING, `DESTINATION_BLOCKNAME` STRING,"\
             "`AIRPORT` STRING, `ORIGINDATETIME_TR` STRING, `DESTINATIONDATETIME_TR` STRING"

    # read the file into a Spark DataFrame
    # the schema is inferred and it assumes that
    # a header is contained
    csv_df = (spark.read.format("csv")
              .option("header", "true")
              .option("inferSchema", False)
              .option("delimiter", "|")
              .schema(schema)
              .load(str(filename)))


    print(f"Schema used {csv_df.printSchema()}")
    spark.stop()

```

The output of the script will show

```
|-- OBJECTID: integer (nullable = true)
 |-- TRIPTYPE: integer (nullable = true)
 |-- PROVIDERNAME: string (nullable = true)
 |-- FAREAMOUNT: float (nullable = true)
 |-- GRATUITYAMOUNT: float (nullable = true)
 |-- SURCHARGEAMOUNT: float (nullable = true)
 |-- EXTRAFAREAMOUNT: float (nullable = true)
 |-- TOLLAMOUNT: float (nullable = true)
 |-- TOTALAMOUN: float (nullable = true)
 |-- PAYMENTTYPE: string (nullable = true)
 |-- ORIGINCITY: string (nullable = true)
 |-- ORIGINSTATE: string (nullable = true)
 |-- ORIGINZIP: string (nullable = true)
 |-- DESTINATIONCITY: string (nullable = true)
 |-- DESTINATIONSTATE: string (nullable = true)
 |-- DESTINATIONZIP: string (nullable = true)
 |-- MILEAGE: string (nullable = true)
 |-- DURATION: string (nullable = true)
 |-- ORIGIN_BLOCK_LATITUDE: string (nullable = true)
 |-- ORIGIN_BLOCK_LONGITUDE: string (nullable = true)
 |-- ORIGIN_BLOCKNAME: string (nullable = true)
 |-- DESTINATION_BLOCK_LATITUDE: string (nullable = true)
 |-- DESTINATION_BLOCK_LONGITUDE: string (nullable = true)
 |-- DESTINATION_BLOCKNAME: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- ORIGINDATETIME_TR: string (nullable = true)
 |-- DESTINATIONDATETIME_TR: string (nullable = true)
```

Programmatically, specifying the schema is more involved as one needs to use the data types from Spark. A snapshot 
how to do this is shown below.


```
...

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType # programatically specify the schema
...

    # specify the schema programmatically
    schema = StructType([StructField("OBJECTID", IntegerType(), False),
                         StructField("PROVIDERNAME", StringType(), False),
                         
                         ...
                         ])
```

## Create a DataFrame

So far we have been using the read methods available in Spark in order to create a DataFrame. However, we can explicitly create a 
DataFrame without reading a file. This is shown in the script below.

```
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType #programatically specify the schema


APP_NAME = "LOAD_CSV_FILE_TO_SPARK"

if __name__ == '__main__':

    # get a spark session
    spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

    # set the schema using DDL
    schema = "`OBJECTID` INT, `TRIPTYPE` INT, `PROVIDERNAME` STRING, `FAREAMOUNT` FLOAT"

    # specify the schema programmatically
    schema = StructType([StructField("OBJECTID", IntegerType(), False),
                         StructField("TRIPTYPE", IntegerType(), False),
                         StructField("PROVIDERNAME", StringType(), False),
                         StructField("FAREAMOUNT", FloatType(), False)])

    data = [[1, 3, "NEW-YORK", 20.0],
            [2, 2, "CAMBRIDGE", 18.2],
            [3, 3, "NEW-YORK", 20.0],
            [4, 2, "LONDON", 25.0],
            [5, 2, "OXFORD", 15.0]]


    df = spark.createDataFrame(data, schema)
    df.show()

    print(f"Schema used {df.printSchema()}")

    spark.stop()

```

Running the script produces the following

```
|OBJECTID|TRIPTYPE|PROVIDERNAME|FAREAMOUNT|
+--------+--------+------------+----------+
|       1|       3|    NEW-YORK|      20.0|
|       2|       2|   CAMBRIDGE|      18.2|
|       3|       3|    NEW-YORK|      20.0|
|       4|       2|      LONDON|      25.0|
|       5|       2|      OXFORD|      15.0|
+--------+--------+------------+----------+

root
 |-- OBJECTID: integer (nullable = false)
 |-- TRIPTYPE: integer (nullable = false)
 |-- PROVIDERNAME: string (nullable = false)
 |-- FAREAMOUNT: float (nullable = false)

```

## Summary

## References

1. Jules S. Damji, Brooke Wenig, Tathagata Das, Deny Lee, _Learning Spark. Lighting-fasts data analytics_, 2nd Edition, O'Reilly.