# Apache Spark's Structured APIs

## Spark: What's Underneath an RDD?

The RDD is the most basic abstraction in Spark. There are three vital characteristics to an RDD.
* Dependencies
* Partitions (with some locality information)
* Compute function: Partiotion=> Iterator[T]

All three are intergral to the simple RDD programming API model upon which all higher-level functionality is constructed.
1. A list of *dependencies* that instincts Spark how an RDD is constructed with its inputs is required. When Reproducing Results, Spark can remake RDDS from these dependencies and replicate the operations. It gives the RDDs **Resiliency**
2. *Partitions* Provide Spark the ability to split the work to parallelize computation on partitions across executors. Spark sometimes (HDFS and others) will us locality information to send work to executors close to the data. This reduces data transmitted over the network
3. RDDs also have a *compute function* that produces an `Iterator [T]` for the data that will be stored in the RDD.

Simple and Elegant! There arise a couple of problems with this model though. Spark does not know *what* you are doing in the compute function. In other terms, the compute function is **opaque** to Spark. This means that joins, filters, selects, or aggreations are seen by Spark mostly as `lambda` expressions. 

Another problem also arises with Python RDDs; Spark sees the `Iterator [T]` data type as opaque. Spark only registers the Iterator as a generic Python object.

## Structuring Spark

There are a few schemes to structure spark. 
- express computations by using common patterns found in data analysis. (avg, filters, selects, aggregates, etc.)
- Can also use a set of common operators within a DSL, in the form of APIs in a Spark compatible language. Very specific
- Also, can use a order and structure scheme. This allows data to be arranged in a tabular format. like SQL tables or a spreadsheet. it has its own supported datatypes. 

### Key Merits and Benefits

Structure yields a number of benefits, including better performance and space effi‐ ciency across Spark components. We will explore these benefits further when we talk about the use of the DataFrame and Dataset APIs shortly, but for now we’ll concen‐ trate on the other advantages: expressivity, simplicity, composability, and uniformity.

In [2]:
import sys
!{sys.executable} -m pip install scala

Defaulting to user installation because normal site-packages is not writeable
[31mERROR: Could not find a version that satisfies the requirement scala (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for scala[0m[31m
[0m

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [7]:
# Create an RDD of tuples (name, age)
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),
                         ("TD", 35), ("Brooke", 25)])
# Use map and reduceByKey transformations with their lambda
# expressions to aggregate and then compute average

agesRDD = (dataRDD
          .map(lambda x: (x[0], (x[1], 1)))
          .reduceByKey(lambda x, y: (x[0]+y[0], x[1] + y[1]))
          .map(lambda x: (x[0], x[1][0]/x[1][1])))

In [10]:
# Create a DataFrame using SparkSession 
spark = (SparkSession
      .builder
      .appName("AuthorsAges")
      .getOrCreate())

# Create a DataFrame
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30),
                                 ("TD", 35), ("Brooke", 25)], ["name", "age"])

# Group the same names together, aggregate their ages, and compute an average 
avg_df = data_df.groupBy("name").agg(avg("age"))
# Show the results of the final execution
avg_df.show()

[Stage 0:>                                                          (0 + 8) / 8]

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Denny|    31.0|
| Jules|    30.0|
|    TD|    35.0|
+------+--------+



                                                                                

This was accomplished via high-level DSL operators and APIs to tell Spark what to do. Spark can inspect or parse this query and understand our intention, it can optimize or arrange the operations for efficient execution. Spark knows exactly what we wish to do: group people by their names, aggregate their ages, and then compute the aver‐ age age of all people with the same name.

by using only high-level, expressive DSL operators mapped to common or recurring data analysis patterns to introduce order and structure, we are limiting the scope of the developers’ ability to instruct the compiler or control how their queries should be computed.

you are not confined to these structured patterns; you can switch back at any time to the unstructured low- level RDD API, although we hardly ever find a need to do so. As well as being simpler to read, the structure of Spark’s high-level APIs also introdu‐ ces uniformity across its components and languages. For example, the Scala code shown here does the same thing as the previous Python code—and the API looks nearly identical:

    // In Scala
    import org.apache.spark.sql.functions.avg import org.apache.spark.sql.SparkSession // Create a DataFrame using SparkSession val spark = SparkSession
    .builder
    .appName("AuthorsAges")
    .getOrCreate()
    // Create a DataFrame of names and ages
    val dataDF = spark.createDataFrame(Seq(("Brooke", 20), ("Brooke", 25), ("Denny", 31), ("Jules", 30), ("TD", 35))).toDF("name", "age")
    // Group the same names together, aggregate their ages, and compute an average
    val avgDF = dataDF.groupBy("name").agg(avg("age")) 
    // Show the results of the final execution
    avgDF.show()
    
Some of these DSL operators perform relational-like operations that you’ll be familiar with if you know SQL, such as selecting, fil‐ tering, grouping, and aggregation. All of this simplicity and expressivity that we developers cherish is possible because of the Spark SQL engine upon which the high-level Structured APIs are built. It is because of this engine, which underpins all the Spark components, that we get uniform APIs. Whether you express a query against a DataFrame in Structured Stream‐ ing or MLlib, you are always transforming and operating on DataFrames as structured data

## The DataFrame API

Inspired by pandas DataFrames in structure, format, and a few specific operations, Spark DataFrames are like distributed in-memory tables with named columns and schemas, where each column has a specific data type: integer, string, array, map, real, date, timestamp, etc.

Example Spark DataFrame Below

| Id (Int) | First (String) | Last (String) | Url (String) | Published (Date) | Hits (Int) | Campaign (List[Strings]) |
| -------- | ---------- | ----------- | --------- | -------------- | --------- | ----------- |
| 1 | Jules | Damji | https://tinyurl.1 | 1/4/2016 | 4535 | [twitter,LinkedIn] | 
| 2 | Brooke | Wenig | https://tinyurl.2 | 5/5/2018 | 8908 | [twitter,LinkedIn] | 
| 3 | Denny | Lee | https://tinyurl.3 | 6/7/2019 | 7659 | [web, twitter, FB, LinkedIn] | 
| 4 | Tathagata | Das | https://tinyurl.4 | 5/12/2018 | 10568 | [twitter, FB] | 
| 5 | Matei | Zaharia | https://tinyurl.5 | 5/14/2014 | 40578 | [web, twitter, FB, LinkedIn] |
| 6 | Reynold | Xin | https://tinyurl.6 | 3/2/2015 | 25568 | [twitter, LinkedIn] |

When data is visualized as a structured table, it’s not only easy to digest but also easy to work with when it comes to common operations you might want to execute on rows and columns. You can add or change the names and data types of the columns, creating new DataFrames while the previ‐ ous versions are preserved. A named column in a DataFrame and its associated Spark data type can be declared in the schema.

### Spark's Basic Data Types

Matching its supported programming languages, Spark supports basic internal data types. These data types can be declared in your Spark application or defined in your schema. For example, in Scala, you can define or declare a particular column name to be of type `String`, `Byte`, `Long`, or `Map`, etc. Here, we define variable names tied to a Spark data type:

    SPARK_HOME/bin/spark-shell
    scala> import org.apache.spark.sql.types._
    import org.apache.spark.sql.types._
    scala> val nameTypes = StringType
    nameTypes: org.apache.spark.sql.types.StringType.type = StringType 
    scala> val firstName = nameTypes
    firstName: org.apache.spark.sql.types.StringType.type = StringType 
    scala> val lastName = nameTypes
    lastName: org.apache.spark.sql.types.StringType.type = StringType
 
Scala basic Data Types
| Data type | Value assigned in Scala | API to instantiate | 
| --------- | ------------ | ------------ | 
| `ByteType` | `Byte` | `DataTypes.ByteType` |
| `ShortType` | `Short` | `DataTypes.ShortType` | 
| `IntegerType` | `Int` | `DataTypes.IntegerType` | 
| `LongType` | `Long` | `DataTypes.LongType` | 
| `FloatType` | `Float` | `DataTypes.FloatType` | 
| `DoubleType` | `Double` | `DataTypes.DoubleType` | 
| `StringType` | `String` | `DataTypes.StringType` | 
| `BooleanType` | `Boolean` | `DataTypes.BooleanType` | 
| `DecimalType` | `java.math.BigDecimal` | `DecimalType` |

Python Basic Data Types
| Data type | Value assigned in Python | API to instantiate | 
| --------- | ------------ | ------------ | 
| `ByteType` | `int` | `DataTypes.ByteType` |
| `ShortType` | `int` | `DataTypes.ShortType` | 
| `IntegerType` | `int` | `DataTypes.IntegerType` | 
| `LongType` | `int` | `DataTypes.LongType` | 
| `FloatType` | `float` | `DataTypes.FloatType` | 
| `DoubleType` | `float` | `DataTypes.DoubleType` | 
| `StringType` | `str` | `DataTypes.StringType` | 
| `BooleanType` | `bool` | `DataTypes.BooleanType` | 
| `DecimalType` | `decimal.Decimal` | `DecimalType` |

### Spark's Structured and Complex Data Types

For complex data analysitcs, you won't deal only with simple or basic data types. Your data will be complex, often structured or nested, and you'll need Spark to handle these complex data types. They come in many forms: maps, arrays, structs, dates, timestamps, fields, etc. 

Scala structured data types in Spark
| Data type | Value assigned in Scala | API to instantiate | 
| --------- | ------------ | ------------ | 
| `BinaryType` | `Array[Byte]` | `DataTypes.BinaryType` |
| `TimestampType` | `java.sql.Timestamp` | `DataTypes.TimestampType` | 
| `DateType` | `java.sql.Date` | `DataTypes.DateType` | 
| `ArrayType` | `scala.collection.Seq` | `DataTypes.createArrayType(ElementType)` | 
| `MapType` | `scala.collection.Map` | `DataTypes.createMapType(keyType, valueType)` | 
| `StructType` | `org.arache.spark.sql.Row` | `StructType(ArrayType[fieldTypes])` | 
| `StructField` | A value type corresponding to the type of this field | `StructField(name, dataType, [nullable])` | 

The equivalent structured data types in python
| Data type | Value assigned in Python | API to instantiate | 
| --------- | ------------ | ------------ | 
| `BinaryType` | `bytearray` | `ByteType()` |
| `TimestampType` | `datetime.datetime` | `TimestampType()` | 
| `DateType` | `datetime.date` | `DateType()` | 
| `ArrayType` | List, tuple, or array | `ArrayType(dataType, [nullable])` | 
| `MapType` | `dict` | `MapType(keyType, valueType, [nullable])` | 
| `StructType` | List or tuple | `StructType([fields])` | 
| `StructField` | A value type corresponding to the type of this | `StructField(name, dataType, [nullable])` | 

## Schemas and Creating DataFrames
A *schema* in Spark defines the column names and associated data types for a DataFrame. Most often, schemas come into play when you are reading structured data from an external data source. Defining a schema up front as opposed to taking a schema-on-read approach offers three benefits:
* You relieve Spark from the onus of inferring data types
* You prevent Spark from creating a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming.
* You can detect errors early if data doesn't match the schema. 

### Two ways to define a schema

Spark allows you to define a schema in two ways. One is to define it programmatically, and the other is to employ a Data Definition Language (DDL) string, which is much simpler and easier to read. To define a schema programmatically for a Dataframe with three named columns, `author`, `title`, and `pages`, you can use the Spark DataFrame API. 

    // In Scala
    import org.apache.spark.sql.types._
    val schema = StructType(Array(StructField("author", StringType, false),
    StructField("title", StringType, false), StructField("pages", IntegerType, false)))
    
    # In Python
    from pyspark.sql.types import *
    schema = StructType([StructField("author", StringType(), False),
    StructField("title", StringType(), False),
    StructField("pages", IntegerType(), False)])
    
Defining the same schema using DDL is much simpler

    // In Scala
    val schema = "author STRING, title STRING, pages INT" 
    
    # In Python
    schema = "author STRING, title STRING, pages INT"

You can choose either language when defining a schema. For many examples, we will use DDLs and static schemas.

    # In Python
    from pyspark.sql import SparkSession
    
    # Define schema for our data using DDL
    schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING,
      `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"
    
    # Create our static data
    data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter",
    "LinkedIn"]],
           [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
    "LinkedIn"]],
           [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
    "twitter", "FB", "LinkedIn"]],
           [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
    ["twitter", "FB"]],
           [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
    "twitter", "FB", "LinkedIn"]],
           [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
    ["twitter", "LinkedIn"]]
          ]
    
    # Main program
    if __name__ == "__main__": 
    # Create a SparkSession 
    spark = (SparkSession
             .builder
             .appName("Example-3_6")
             .getOrCreate())
    
    # Create a DataFrame using the schema defined above
    blogs_df = spark.createDataFrame(data, schema)
    
    # Show the DataFrame; it should reflect our table above 
    blogs_df.show()
    # Print the schema used by Spark to process the DataFrame 
    print(blogs_df.printSchema())
    
If you want to use this schema elsewhere in code, simply execute `blogs_df.schema` and it will return the schema defintion:
    StructType(List(StructField("Id",IntegerType,false),
    StructField("First",StringType,false),
    StructField("Last",StringType,false),
    StructField("Url",StringType,false),
    StructField("Published",StringType,false),
    StructField("Hits",IntegerType,false),
    StructField("Campaigns",ArrayType(StringType,true),false)))

# Columns and Expressions

As mentioned, named columns in DataFrames are conceptually similar to named columns in pandas or R DataFrames or in an RDBMS table; they describe a type of field. You can list all the columns by their names, and you can perform operations on their values using relational or computational expressions. In Spark's supported languages, columns are objects with public methods (represented by the `Column` type).

There also exists logical and mathematical expressions on columns. For example, `expr('columnName * 5')` or `(expr('columnName - 5') > col(anothercolumnName))`, where `columnName` is a Spark type (integer, string, etc.) `expr()` is part of the `pyspark.sql.functions` (Python) and `org.apache.spark.sql.functions` (Scala) packages. Like anoy other function in those packages, `expr()` takes arguments that Spark will parse as an expression, computing the result. 

Scala, Java, and Python all have public methods associated with columns. You'll note that the Spark documentation refers to both `col` and `Column`. `Column` is the name of the object, while `col()` is a standard built-in function that returns a `Column`

`Column` objects in a DataFrame can't exist in isolation; each column is part of a row in a record and all the rows together constitute a DataFrame, which as we will see later in the chapter is really a `Dataset[Row]` in Scala.

# Rows

A row in Spark is a generic *Row object*, containing one or more columns. Each column may be of the same data type (e.g. integer or string), or they can have different types (integer, sting, map, array, etc.) Because `Row` in each of Spark's supported languages and access its field by an index starting at 0. `Row` objects can be used to create DataFrames if you need them for quick interactivity and exploration:

    # In Python
    rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
    authors_df = spark.createDataFrame(rows, ["Authors", "State"])
    authors_df.show()
    
    // In Scala
    val rows = Seq(("Matei Zaharia", "CA"), ("Reynold Xin", "CA")) val authorsDF = rows.toDF("Author", "State")
    authorsDF.show()

In practice, you will usually want to read DataFrames from a file as illustrated earlier. In most cases, defining a schema and using it is a quicker and more efficient way to create DataFrames. After you have created a large distibuted DataFrame, you are going to want to perform some common data operations on it. Let's examine some of the Sprk operations you can perform with high-level relational operators in the Structured APIs.

# Common DataFrame Operations

To perform common data operations on DataFrames, you'll first need to load a DataFrame from a data source that holds your structured data. Spark provides an interface, `DataFrameReader`, that enables you to read data into a DataFrame from myriad data sources in formats such as JSON, CSV, Parquet, Text, Avro, ORC, and more! Likewise, to write a DataFrame back to a data source in a particular format, Spark uses `DataFrameWriter`. 

## Using DataFrameReader and DataFrameWrite

Reading and writing are simple in Spark because of these high-level abstractions and contributions from the community to connect to a wide variety of data sources, including common NoSQL stores, RDBMSs, streaming engines such as Apache Kafka and Kinesis, and more. On large files, it is more efficient to determine a schem and then have Spark infer it. If you don't want to specify the schema, Spark can infer schema from a sample at a lesser cost. For exmaple, the `samplingRatio` option:

    // In Scala
    val sampleDF = spark .read
    .option("samplingRatio", 0.001) 
    .option("header", true) 
    .csv("""/databricks-datasets/learning-spark-v2/ sf-fire/sf-fire-calls.csv""")


In [1]:
 # In Python, define a schema
from pyspark.sql.types import *

# Programmatic way to define a schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                          StructField('UnitID', StringType(), True),
                          StructField('IncidentNumber', IntegerType(), True),
                          StructField('CallType', StringType(), True),
                          StructField('CallDate', StringType(), True),
                          StructField('WatchDate', StringType(), True),
                          StructField('CallFinalDisposition', StringType(), True),
                          StructField('AvailableDtTm', StringType(), True),
                          StructField('Address', StringType(), True),
                          StructField('City', StringType(), True),
                          StructField('Zipcode', IntegerType(), True),
                          StructField('Battalion', StringType(), True),
                          StructField('StationArea', StringType(), True),
                          StructField('Box', StringType(), True),
                          StructField('OriginalPriority', StringType(), True),
                          StructField('Priority', StringType(), True),
                          StructField('FinalPriority', IntegerType(), True),
                          StructField('ALSUnit', BooleanType(), True),
                          StructField('CallTypeGroup', StringType(), True),
                          StructField('NumAlarms', IntegerType(), True),
                          StructField('UnitType', StringType(), True),
                          StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                          StructField('FirePreventionDistrict', StringType(), True),
                          StructField('SupervisorDistrict', StringType(), True),
                          StructField('Neighborhood', StringType(), True),
                          StructField('Location', StringType(), True),
                          StructField('RowID', StringType(), True),
                          StructField('Delay', FloatType(), True)])

# Use the DataFrameReader interface to read a CSV file
sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

AnalysisException: Path does not exist: file:/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv

In scala, this is accomplished similarly. The `spark.read.csv()` function reads the CSV file and returns a DataFrame of rows and named columns with types dictated in the schema.

To write the DataFrame into an external data source in your format of choice, you can use the `DataFrameWriter` interface. Like `DataFrameReader`, it supports multiple data sources. Parquet, a popular columnar format,, is the default format; it uses snappy compression to compress the data. If the DataFrame is written as Parquet, the schema is preserved as part of the Parquet metadata. In this case, subsequent reads back into a DataFrame do not require you to manually supply a schema. 

### Saving a DataFrame as a Parquet file or SQL table. 

A common data operation is to explore and transform you data, and then persist the DataFrame in Parquet format or save it as a SQL table. Persisting a transformed DataFrame is as easy as reading it. For example, to persist the DataFrame:

    // In Scala to save as a Parquet file
    val parquetPath = ... fireDF.write.format("parquet").save(parquetPath)
    # In Python to save as a Parquet file
    parquet_path = ...
    fire_df.write.format("parquet").save(parquet_path)

Alternatively, you can save it as a table, which registers metadata with the Hive meta-store:

        // In Scala to save as a table
        val parquetTable = ... // name of the table fireDF.write.format("parquet").saveAsTable(parquetTable)
        # In Python
        parquet_table = ... # name of the table fire_df.write.format("parquet").saveAsTable(parquet_table)
        
note that tranformations and actions exist that can be enacted upon an existing DataFrame in a variety of methods. 

### Projections and Filters

A *projection* in relational parlance is a way to return only the rows matching a certain relational contiditon by using filters. In Spark, projections are done with the `select()` method, while filters can be expressed using the `filter()` or `where()` method. This technique can be used to examine specific aspects of datasets. For example. 

    # In Python
    few_fire_df = (fire_df
      .select("IncidentNumber", "AvailableDtTm", "CallType")
      .where(col("CallType") != "Medical Incident"))
    few_fire_df.show(5, truncate=False)
    
    // In Scala
    val fewFireDF = fireDF
        .select("IncidentNumber", "AvailableDtTm", "CallType") 
        .where(5"CallType" =!= "Medical Incident")
    fewFireDF.show(5, false)

What if we want to know how may distinct `CallTypes` were recorded as the causes of the fire calls? These simple and expressive queries are effective at getting the job done.

    # In Python, return number of distinct types of calls using countDistinct()
    from pyspark.sql.functions import * 
    (fire_df.select("CallType")
      .where(col("CallType").isNotNull())
      .agg(countDistinct("CallType").alias("DistinctCallTypes"))
      .show())
      
    // In Scala
    import org.apache.spark.sql.functions._ 
    fireDF
      .select("CallType")
      .where(col("CallType").isNotNull)
      .agg(countDistinct('CallType) as 'DistinctCallTypes)
      .show()
      
We can list the distinct call types in the data set using these queries:

    # In Python, filter for only distinct non-null CallTypes from all the rows
    (fire_df
      .select("CallType")
      .where(col("CallType").isNotNull())
      .distinct()
      .show(10, False))
      
    // In Scala
    fireDF
    .select("CallType") .where(4"CallType".isNotNull()) .distinct()
    .show(10, false)
    

### Renaming, adding, and dropping Columns

Sometimes you want to rename particular columns for reasons of style or convention, and at other times for readability or brevity.  Spaces in column names can be problematic, especially when you want to write or save a DataFrame as a Parquet file (which prohibits this).

By specifying the desired column names in the schema with `StructField`, as we did, we effectively changed all names in the resulting DataFrame. Alternatively, you could selectively rename columns with the `withColumnRenamed()` method. 

    # In Python
    new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
    (new_fire_df
      .select("ResponseDelayedinMins")
      .where(col("ResponseDelayedinMins") > 5)
      .show(5, False))
      
    // In Scala
    val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins") newFireDF
    .select("ResponseDelayedinMins") .where(4"ResponseDelayedinMins" > 5) .show(5, false)
    
This outputs a new renamed column. Because DataFrame transformations are immutable, when we rename a column using `withColumnRenamed()` we get a new DataFrame while retaining the original with the old column name. Modifying the contents of a column or its type are common operations during data exploration. In some cases the data is raw and dirty, or its types are not amenable to being supplied as arguments to relational operators. For example, in our example data, some columns will have string format dates which could be changed into a more universal datatype such as Unix timestamps or SQL dates.

So how do we convert data from strings into a usable format? It's quite simple. thanks to high-level API methods. spark.sql.functions has a set of to/from date/timestamp functions such as to_timestamp() and to_date() that we can use just for this purpose.

In [2]:
# In Python
fire_ts_df = (new_fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
              .drop("CallDate")
              .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
              .drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
              .drop("AvailableDtTm"))

# Select the converted columns
(fire_ts_df
 .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
 .show(5, False))

IndentationError: unexpected indent (2351094566.py, line 2)

22/08/17 18:07:56 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 301205 ms exceeds timeout 120000 ms
22/08/17 18:07:56 WARN SparkContext: Killing executors is not supported by current scheduler.



    // In Scala
    val fireTsDF = newFireDF
        .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")) .drop("CallDate")
        .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")) .drop("WatchDate")
        .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
        .drop("AvailableDtTm")
    
    // Select the converted columns
    fireTsDF.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, false)
    
Those queries pack quite a punch- a number of things are happening. Let's unpack what they do.
1. Convert the existing column's data type from string to a Spark-supported timestamp.
2. use the new format specified in the format string "MM/dd/yyyy" or "MM/dd/yyyy hh:mm:ss a" where appropriate
3. After converting to the new data type, `drop()` the old column and append the new one specified in the first argument to the `withColumn()` method.
4. Assign the new modified DataFrame to `fire_ts_df`.

The queries would result in 3 new columns in the above examples. 

Now that we have modified the dates, we can query using functions from `spark.sql.functions` like `month()`, `year()`, and `day()` to explore our data further. We could find out how many calls were logged in the last seven days, or we could see how many years’ worth of Fire Department calls are included in the data set with this query:

    # In Python
    (fire_ts_df
      .select(year('IncidentDate'))
      .distinct()
      .orderBy(year('IncidentDate'))
      .show())

    // In Scala
    fireTsDF
      .select(year(4"IncidentDate"))
      .distinct()
      .orderBy(year(4"IncidentDate"))
      .show()
      
One final common operation is grouping data by values in a column and aggregating the data in some way, like simply counting it. This pattern of grouping and counting is as common as projecting and filtering.

### Aggregations

What if we want to know what the most common types of fire calls were, or what zip codes accounted for the most calls? These kinds of questions are common in data analysis and exploration. A handful of transformations and actions on DataFrames, such as `groupBy()`, `orderBy()`, and `count()`, offer the ability to aggregate by column names and then aggregate counts across them. For larger DataFrame on which you plan to conduct freqent or repeated queries, you could benefit from caching.

    # In Python
    (fire_ts_df
      .select("CallType")
      .where(col("CallType").isNotNull())
      .groupBy("CallType")
      .count()
      .orderBy("count", ascending=False)
      .show(n=10, truncate=False))
      
    // In Scala
    fireTsDF
        .select("CallType") 
        .where(col("CallType").isNotNull) 
        .groupBy("CallType")
        .count()
        .orderBy(desc("count"))
        .show(10, false)


The DataFrame API also offers the `collect()` method, but for extremely large DataFrames this is resource-heavy (expensive) and dangerous, as it can cause out-of-memory (OOM) exceptions. Unlike `count()`, which returns a single number to the driver, `collect()` returns a collection of all the `Row` objects in the entire Data‐ Frame or Dataset. If you want to take a peek at some Row records you’re better off with `take(n)`, which will return only the first `n Row` objects of the DataFrame

### Other Common DataFrame Operations.

Along with all the others above, the DataFrame APi provides descriptive statistical methodds like `min()`, `max()`, `sum()` and `avg()`. 