In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Ch 3. Basic Structured Operations

In [1]:
!pwd

/Users/kaden.cho/git/kadensungbincho/Code_followup_book/spark_definitive_guide


In [2]:
df = spark.read.format("json")\
    .load("./2015-summary.json")

In [3]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



## Schemas

In [5]:
spark.read.format("json")\
    .load("./2015-summary.json").schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

In [34]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False),
])

In [35]:
df = spark.read.format("json")\
    .schema(myManualSchema)\
    .load("./2015-summary.json")

In [36]:
from pyspark.sql.functions import col, column

In [37]:
col("someColumnName")
column("someColumnName")

Column<b'someColumnName'>

## Explicit Column References

If you need to refer to a specific DataFrame's column, you can use the col method on the specific DataFrame. This can be useful when you are performing a join and need to refer to a specific column in one DataFrmae that ay share a name with another column in the joined DataFrame. We will see this in the joins chapter. As an added benefit, Spark does not need to resolve this column itself because we did that for Spark.

## Expressions

Now we metioned that columns are expressions, so what is an expression? An expression is a set of transformations on one or more values in a record in a DataFrame. Think of it like a function that takes as input one ore more column names, resolves them and then potentially applies more expresions to create a single value for each record in the dataset. Importantly, this "single value" can actually be a complex type like a Map type or Array type.

## Columns as Expressions

Columns provide a subset of expression functionality. If you use col() and wish to perform transformations on that column, you must perform those on that coliumn reference. When using an expression, the expr function can actually parse transformations and column references from a string and can subsequently be passed into further transformations. Let's look at some examples.

In [12]:
(((col("someCol") + 5) * 200) - 6) < col("otherCol")

Column<b'((((someCol + 5) * 200) - 6) < otherCol)'>

In [13]:
from pyspark.sql.functions import expr
expr("(((someCol +5) * 200) - 6) < otherCol")

Column<b'((((someCol + 5) * 200) - 6) < otherCol)'>

This is an extremely important point to reinforce. Notice how the previous expression is actually valid SQL code as well, just like you might put in a SELECT statement? That's becuase this SQL expression and the previous DataFrame ode compile to the same underlying logical tree prior to execution. This means you can write your expressions as DataFrame code or as SQL expression and get the exact same benefits. You likely saw all of this in the first chapters of the book and we covered this more extensively in the Overview of the Structured APIs chapter.

## Records and Rows

In Spark, a record or row makes up a "row" in a DataFrame. A logical record or row is an object of type Row. Row objects are the objects that column expressions operate on to produce some usable value. Row objects represent physical byte arrays. The byte array interface is never shown to users because we only use column expressions to manipulate them.

In [14]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COURTRY_NAME=None, count=15)

## Creating Rows

You can create rows by manually instantiating a Row object with the values that below in each column. It's important to note that only DataFrames have schema. rows themselves do not have schemas. This means if you create a Row manually, you must specify the values in the same order as the schema of the DataFrame they may be appended to. We will see this when we discuss creating DataFrames.

In [21]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [22]:
myRow[0]

'Hello'

In [23]:
myRow[2]

1

## DataFrame Transformations

Now that we briefly defined the core parts of a DataFrmae, we will move onto manipulating DataFrames. When working with individual DataFrames there are some fundamental objectives. These break down into several core operations.

## Creating DataFrames

As we saw previously, we can create DataFrame from raw data sources. This is covered extensively in the Data Srouces chapter howeber we will use them now to create an example DataFrame. For illustration purposes later in this chapter, we wiil also register this as a temporary view so that we can query it with SQL.

In [25]:
df.createOrReplaceTempView("dfTable")

In [27]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
    StructField("some", StringType(), True),
    StructField("col", StringType(), True),
    StructField("names", LongType(), False),
])

myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



In [28]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [32]:
df.describe()

DataFrame[summary: string, DEST_COUNTRY_NAME: string, ORIGIN_COURTRY_NAME: string, count: string]

In [38]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)



+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [39]:
from pyspark.sql.functions import expr, col, column

df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"))\
    .show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [40]:
df.select(expr("DEST_COUNTRY_NAME AS destination"))

DataFrame[destination: string]

In [41]:
df.select(expr("DEST_COUNTRY_NAME as destination")).alias("DEST_COUNTRY_NAME")

DataFrame[destination: string]

In [42]:
df.selectExpr(
    "DEST_COUNTRY_NAME  as newColumnName",
    "DEST_COUNTRY_NAME").show(2)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [43]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))")

DataFrame[avg(count): double, count(DISTINCT DEST_COUNTRY_NAME): bigint]

## Converting to Spark Types

In [44]:
from pyspark.sql.functions import lit

df.select(
    expr("*"),
    lit(1).alias("One")
).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



## Adding Columns

In [45]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [46]:
df.withColumn(
    "withincountry",
    expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
    .show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withincountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



## Renaming Columns

In [47]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

## Reserved Characters and Keywords in Column Names

In [48]:
 dfWithLongColName = df\
    .withColumn(
     "This Long Column-Name",
     expr("ORIGIN_COUNTRY_NAME"))

In [49]:
dfWithLongColName\
    .selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")\
    .show(2)

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



## Removing Columns

In [50]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

## Changing a Column's Type

In [51]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [52]:
df.withColumn("count", col("count").cast("int")).printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



## Filtering Rows

To filter rows we create an expression that evaluates to true or false. We then filter out the rows that have expression that is equal to false. The most common way tot do this with DataFrame is to create either an expression as a String or build an expression with a set of column manipulations. There are two methods to perform this operation, we can use where or filter and they both will perform the same operation and accept the same argument types when used with DataFrames. the Dataset API has slightly different options and please refer to the Dataset chapter for more information.

In [53]:
colCondition = df.filter(col("count") < 2).take(2)

In [54]:
conditional = df.where("count < 2").take(2)

In [55]:
df.where(col("count") < 2)\
    .where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
    .show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



## Getting Unique Rows

A very common use case is to get the unique or distinct values in a DataFrame. These values can be in one or more columns. The way we do this is with the distinct method on a Datarame that will allow us to deduplicate any rows that are in that DataFrame. For instance let's get the unique origins in our dataset. This of course is a transformation that will return a new DataFrame with only unqiue rows.

In [56]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").count()

256

In [57]:
df.select("ORIGIN_COUNTRY_NAME").count()

256

## Random Samples

In [58]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

126

## Random Splits

In [59]:
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count()

False

## Concatenating and Appending Rows to DataFrames

In [62]:
from pyspark.sql import Row
schema = df.schema

newRows = [
    Row("New Country", "Other Country", 5),
    Row("New Country 2", "Other Country 3", 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [63]:
df.union(newDF)\
    .where("count = 1")\
    .where(col("ORIGIN_COUNTRY_NAME") != "Unites States")\
    .show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Croatia|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|Saint Vincent and...|      United States|    1|
|            Suriname|      United States|    1|
|       United States|             Cyprus|    1|
|        Burkina Faso|      United States|    1|
|            Djibouti|      United States|    1|
|       United States|            Estonia|    1|
|              Zambia|      United States|    1|
|              Cyprus|      United States|    1|
|       United States|          Lithuania|    1|
|       United States|           Bulgaria|    1|
|       United States|            Georgia|    1|
|       United States|            Bahrain|    1|
|       Cote d'Ivoir

## Sorting Rows

In [64]:
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [65]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



## Repartition and Coalesce

Another important optimization opportunity is to partition the data according to some frequently filtered columns which controls the physical layout of data across the cluster including the partitioning scheme and the number of partitions.

Repartition will incur a full shuffle of the data, regardless of whether or not one is necessary. This means that you should typically only repartition when the future number of parititons is greater than your current number of parittions or when you are looking to parittion by a set of colimns.

In [66]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [67]:
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [68]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

## Collecting Rows to the Driver

As we covered in the previous chapters. Spark has a Driver that maintains cluster information and runs user code. This means that when we call some method to collect data, this is collected to the spark Driver.

## Working with Dates and Timestamps

Dates and times are a constant challenge in programming languages and databases. It's always necessary to keep track of timezones and make sure that formats are correct and valid. Spark does its best to keep things simple by focusing explicitly on two kinds of time related information. There are datas, which focus exclusively on calendar dataes, and timestamps that include both date and time information.

In [6]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
    .withColumn("today", current_date())\
    .withColumn("now", current_timestamp())

In [7]:
dateDF.createOrReplaceTempView("dateTable")

In [8]:
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



Now that we have a simple DataFrame to work with, let's add and substract 5 days from today. These functions take a column and then the number of days to either add or subtract as the arguments.

In [9]:
from pyspark.sql.functions import col, date_add, date_sub
dateDF\
    .select(
        date_sub(col("today"), 5),
        date_add(col("today"), 5))\
    .show(1)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2019-03-17|        2019-03-27|
+------------------+------------------+
only showing top 1 row



Another common task is to take a look at the difference between two dates. We can do this with the datediff function that will return the number of days in between two dates. Most often we just care about the days although since months can have a strange number of days there also exists a function months_between that gives you the number of months between two dates.

In [10]:
from pyspark.sql.functions import to_date, lit

spark.range(5).withColumn("date", lit("2017-01-01"))\
    .select(to_date(col("date")))\
    .show(1)

+---------------+
|to_date(`date`)|
+---------------+
|     2017-01-01|
+---------------+
only showing top 1 row



## Working with Nulls in Data

As a best practice, you should always use nulls to represent missing or empty data in your DataFrames. Spark can optimize working with null values more than it can if you use empty strinns or other values. The primary way of interacting with nul values, at DataFrame scale, is to use the .na subpackage on a DataFrame.
In Spark there are two things you can do with null values. You can explicitly drop nulls or you can fill them with a value.

## Drop

The simplest is probable drop, which simply removes rows that contain nulls. The default is to drop any row where any value is null.