<h1 style="text-align:center"> INFO 323: Cloud Computing and Big Data</h1>
<h2 style="text-align:center"> College of Computing and Informatics</h2>
<h2 style="text-align:center">Drexel University</h2>

<h3 style="text-align:center"> Structured API (Ch 5: Basic Operations)</h3>
<h3 style="text-align:center"> Yuan An, PhD</h3>
<h3 style="text-align:center">Associate Professor</h3>

## Code from Chapter 5 of Spark Definitive Guide

### Create a DataFrame

In [None]:
df = spark.read.format("json").load("2015-summary.json")

Show the schema

In [None]:
# COMMAND ----------

spark.read.format("json").load("2015-summary.json").schema

The example that follows shows how to create and enforce a
specific schema on a DataFrame.

In [None]:
# COMMAND ----------

from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema)\
  .load("2015-summary.json")

### Columns
There are a lot of different ways to construct and refer to columns but the two simplest ways are by
using the col or column functions. To use either of these functions, you pass in a column name:

In [None]:
# COMMAND ----------

from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")

### Columns as expressions
Columns provide a subset of expression functionality. If you use col() and want to perform
transformations on that column, you must perform those on that column reference. When using an
expression, the expr function can actually parse transformations and column references from a string
and can subsequently be passed into further transformations. Let’s look at some examples.

In [None]:
# COMMAND ----------

from pyspark.sql.functions import expr
expr("(((someCol + 5) * 200) - 6) < otherCol")

### Records and Rows
In Spark, each row in a DataFrame is a single record. Spark represents this record as an object of
type Row. Spark manipulates Row objects using column expressions in order to produce usable values.
Row objects internally represent arrays of bytes. The byte array interface is never shown to users
because we only use column expressions to manipulate them.

In [None]:
# COMMAND ----------

from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

Accessing data in rows is equally as easy: you just specify the position that you would like.

In [None]:
# COMMAND ----------

myRow[0]
myRow[2]

### Creating DataFrames
We can create DataFrames from raw data sources. We will also register this as a temporary view so that we can query it
with SQL and show off basic transformations in SQL.

In [None]:
# COMMAND ----------

df = spark.read.format("json").load("2015-summary.json")
df.createOrReplaceTempView("dfTable")

We can also create DataFrames on the fly by taking a set of rows and converting them to a DataFrame.

In [None]:
# COMMAND ----------

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

Use the select method and pass in the column names as
strings with which you would like to work:

In [None]:
# COMMAND ----------

df.select("DEST_COUNTRY_NAME").show(2)

You can select multiple columns by using the same style of query, just add more column name strings
to your select method call:

In [None]:
# COMMAND ----------

df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

You can refer to columns in a number of different ways;
all you need to keep in mind is that you can use them interchangeably:

In [None]:
# COMMAND ----------

from pyspark.sql.functions import expr, col, column
df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"))\
  .show(2)

As we’ve seen thus far, expr is the most flexible reference that we can use. It can refer to a plain
column or a string manipulation of a column. To illustrate, let’s change the column name, and then
change it back by using the AS keyword and then the alias method on the column:

In [None]:
# COMMAND ----------

df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

This changes the column name to “destination.” You can further manipulate the result of your
expression as another expression:

In [None]:
# COMMAND ----------

df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))\
  .show(2)

Because select followed by a series of expr is such a common pattern, Spark has a shorthand for
doing this efficiently: selectExpr. This is probably the most convenient interface for everyday use:

In [None]:
# COMMAND ----------

df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

This opens up the true power of Spark. We can treat selectExpr as a simple way to build up
complex expressions that create new DataFrames. In fact, we can add any valid non-aggregating SQL
statement, and as long as the columns resolve, it will be valid! Here’s a simple example that adds a
new column withinCountry to our DataFrame that specifies whether the destination and origin are
the same:

In [None]:
# COMMAND ----------

df.selectExpr(
  "*", # all original columns
  "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
  .show(2)

With select expression, we can also specify aggregations over the entire DataFrame by taking
advantage of the functions that we have. These look just like what we have been showing so far:

In [None]:
# COMMAND ----------

df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

### Converting to Spark Types (Literals)
Sometimes, we need to pass explicit values into Spark that are just a value (rather than a new
column). This might be a constant value or something we’ll need to compare to later on. The way we
do this is through literals. This is basically a translation from a given programming language’s literal
value to one that Spark understands. Literals are expressions and you can use them in the same way:

In [None]:
# COMMAND ----------

from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)

### Adding Columns
There’s also a more formal way of adding a new column to a DataFrame, and that’s by using the
withColumn method on our DataFrame. For example, let’s add a column that just adds the number
one as a column:

In [None]:
# COMMAND ----------

df.withColumn("numberOne", lit(1)).show(2)

Let’s do something a bit more interesting and make it an actual expression. In the next example, we’ll
set a Boolean flag for when the origin country is the same as the destination country:

In [None]:
# COMMAND ----------

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
  .show(2)

Notice that the withColumn function takes two arguments: the column name and the expression that
will create the value for that given row in the DataFrame. Interestingly, we can also rename a column
this way. Although we can rename a column in the manner that we just described, another alternative is to use
the withColumnRenamed method. This will rename the column with the name of the string in the first
argument to the string in the second argument:

In [None]:
# COMMAND ----------

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

### Removing Columns
Now that we’ve created this column, let’s take a look at how we can remove columns from
DataFrames. You likely already noticed that we can do this by using select. However, there is also a
dedicated method called drop:
df.drop("ORIGIN_COUNTRY_NAME").columns
We can drop multiple columns by passing in multiple columns as arguments:

In [None]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

### Filtering Rows
To filter rows, we create an expression that evaluates to true or false. You then filter out the rows
with an expression that is equal to false. The most common way to do this with DataFrames is to
create either an expression as a String or build an expression by using a set of column manipulations.
There are two methods to perform this operation: you can use where or filter and they both will
perform the same operation and accept the same argument types when used with DataFrames. We will
stick to where because of its familiarity to SQL; however, filter is valid as well.

In [None]:
# COMMAND ----------

df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
  .show(2)

### Getting Unique Rows
A very common use case is to extract the unique or distinct values in a DataFrame. These values can
be in one or more columns. The way we do this is by using the distinct method on a DataFrame,
which allows us to deduplicate any rows that are in that DataFrame. For instance, let’s get the unique
origins in our dataset. This, of course, is a transformation that will return a new DataFrame with only
unique rows:

In [None]:
# COMMAND ----------

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

In [None]:
# COMMAND ----------

df.select("ORIGIN_COUNTRY_NAME").distinct().count()

### Random Splits
Random splits can be helpful when you need to break up your DataFrame into a random “splits” of
the original DataFrame. This is often used with machine learning algorithms to create training,
validation, and test sets.

In [None]:
# COMMAND ----------

seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

In [None]:
# COMMAND ----------

dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False

### Concatenating and Appending Rows (Union)
As you learned in the previous section, DataFrames are immutable. This means users cannot append
to DataFrames because that would be changing it. To append to a DataFrame, you must union the
original DataFrame along with the new DataFrame. This just concatenates the two DataFramess. To
union two DataFrames, you must be sure that they have the same schema and number of columns;
otherwise, the union will fail.

In [None]:
# COMMAND ----------

from pyspark.sql import Row
schema = df.schema
newRows = [
  Row("New Country", "Other Country", 5L),
  Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [None]:
# COMMAND ----------

df.union(newDF)\
  .where("count = 1")\
  .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
  .show()

### Sorting Rows
When we sort the values in a DataFrame, we always want to sort with either the largest or smallest
values at the top of a DataFrame. There are two equivalent operations to do this sort and orderBy
that work the exact same way. They accept both column expressions and strings as well as multiple
columns. The default is to sort in ascending order:

In [None]:
# COMMAND ----------

df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

To more explicitly specify sort direction, you need to use the asc and desc functions if operating on a
column. These allow you to specify the order in which a given column should be sorted:

In [None]:
# COMMAND ----------

from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

For optimization purposes, it’s sometimes advisable to sort within each partition before another set of
transformations. You can use the sortWithinPartitions method to do this:

In [None]:
# COMMAND ----------

spark.read.format("json").load("*-summary.json")\
  .sortWithinPartitions("count")

### Limit
Oftentimes, you might want to restrict what you extract from a DataFrame; for example, you might
want just the top ten of some DataFrame. You can do this by using the limit method:

In [None]:
# COMMAND ----------

df.limit(5).show()


# COMMAND ----------

df.orderBy(expr("count desc")).limit(6).show()

## Spark SQL

In [None]:
df.createOrReplaceTempView('df_table')

In [None]:
spark.sql("select * from df_table").show(3)

In [None]:
spark.sql('select DEST_COUNTRY_NAME, count(ORIGIN_COUNTRY_NAME) as origins from df_table group by DEST_COUNTRY_NAME order by origins desc').show(3)

## To Pandas

In [None]:
df.toPandas() #Why and when?

In [None]:
df_pd = df_toPandas()

In [None]:
df_sp = spark.createDataFrame(df_pd)

### Repartition and Coalesce
Another important optimization opportunity is to partition the data according to some frequently
filtered columns, which control the physical layout of data across the cluster including the partitioning
scheme and the number of partitions.
Repartition will incur a full shuffle of the data, regardless of whether one is necessary. This means
that you should typically only repartition when the future number of partitions is greater than your
current number of partitions or when you are looking to partition by a set of columns:

In [None]:
# COMMAND ----------

df.rdd.getNumPartitions() # 1

In [None]:
# COMMAND ----------

df.repartition(5)

If you know that you’re going to be filtering by a certain column often, it can be worth repartitioning
based on that column:

In [None]:
# COMMAND ----------

df.repartition(col("DEST_COUNTRY_NAME"))

You can optionally specify the number of partitions you would like, too:

In [None]:
# COMMAND ----------

df.repartition(5, col("DEST_COUNTRY_NAME"))

Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions. This
operation will shuffle your data into five partitions based on the destination country name, and then
coalesce them (without a full shuffle):

In [None]:
# COMMAND ----------

df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

### Collecting Rows to the Driver
As discussed in previous chapters, Spark maintains the state of the cluster in the driver. There are
times when you’ll want to collect some of your data to the driver in order to manipulate it on your
local machine.
Thus far, we did not explicitly define this operation. However, we used several different methods for
doing so that are effectively all the same. collect gets all data from the entire DataFrame, take
selects the first N rows, and show prints out a number of rows nicely.

In [None]:
# COMMAND ----------

collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()