# Spark Practice 

Spark Practice and Notes Following O'Really Spark Definitive Guide

1 - ON STARTING SPARK CLUSTER

"Some of your legacy code might use the new SparkContext pattern. This should be avoided in favor of the builder method on the SparkSession, which more robustly instantiates the Spark and SQL Contexts and ensures that there is no context conflict, given that there might be multiple libraries trying to create a session in the same Spark Appication"

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local").appName("Word Count")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

From the SparkSession, you can access all of low-level and legacy contexts and configurations accordingly, as well. Note that the SparkSession class was only added in Spark 2.X. Older code you might find would instead directly create a SparkContext and a SQLContext for the structured APIs.

A SparkContext object within the SparkSession represents the connection to the Spark cluster. This class is how you communicate with some of Spark’s lower-level APIs, such as RDDs. It is commonly stored as the variable sc in older examples and documentation. Through a SparkContext, you can create RDDs, accumulators, and broadcast variables, and you can run code on the cluster. For the most part, you should not need to explicitly initialize a SparkContext; you should just be able to access it through the SparkSession.

Partitioning:

In [3]:
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")

step4.collect()

[Row(sum(id)=2500000000000)]

In [4]:
step4.explain()

== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(id#8L)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_sum(id#8L)])
      +- *Project [id#8L]
         +- *SortMergeJoin [id#8L], [id#3L], Inner
            :- *Sort [id#8L ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(id#8L, 200)
            :     +- *Project [(id#0L * 5) AS id#8L]
            :        +- Exchange RoundRobinPartitioning(5)
            :           +- *Range (2, 10000000, step=2, splits=Some(1))
            +- *Sort [id#3L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(id#3L, 200)
                  +- Exchange RoundRobinPartitioning(6)
                     +- *Range (2, 10000000, step=4, splits=Some(1))


Stages in Spark represent groups of tasks that can be executed together to compute the same operation on multiple machines. In general, Spark will try to pack as much work as possible (i.e., as many transformations as possible inside your job) into the same stage, but the engine starts new stages after operations called shuffles. A shuffle represents a physical repartitioning of the data—for example, sorting a DataFrame, or grouping data that was loaded from a file by key (which requires sending records with the same key to the same node). This type of repartitioning requires coordinating across executors to move data around. Spark starts a new stage after each shuffle, and keeps track of what order the stages must run in to compute the final result.

Stages in Spark consist of tasks. Each task corresponds to a combination of blocks of data and a set of transformations that will run on a single executor. If there is one big partition in our dataset, we will have one task. If there are 1,000 little partitions, we will have 1,000 tasks that can be executed in parallel. Partitioning your data into a greater number of partitions means that more can be executed in parallel.

The first two stages correspond to the range that you perform in order to create your DataFrames. By default when you create a DataFrame with range, it has eight partitions. The next step is the repartitioning. This changes the number of partitions by shuffling the data. These DataFrames are shuffled into six partitions and five partitions, corresponding to the number of tasks in stages 3 and 4.

Stages 3 and 4 perform on each of those DataFrames and the end of the stage represents the join (a shuffle). Suddenly, we have 200 tasks. This is because of a Spark SQL configuration. The spark.sql.shuffle.partitions default value is 200, which means that when there is a shuffle performed during execution, it outputs 200 shuffle partitions by default. You can change this value, and the number of output partitions will change.

Spark automatically pipelines stages and tasks that can be done together, such as a map operation followed by another map operation. Second, for all shuffle operations, Spark writes the data to stable storage (e.g., disk), and can reuse it across multiple jobs.

An important part of what makes Spark an “in-memory computation tool” is that unlike the tools that came before it (e.g., MapReduce), Spark performs as many steps as it can at one point in time before writing data to memory or disk.

The second property you’ll sometimes see is shuffle persistence. When Spark needs to run an operation that has to move data across nodes, such as a reduce-by-key operation (where input data for each key needs to first be brought together from many nodes), the engine can’t perform pipelining anymore, and instead it performs a cross-network shuffle.

2 - ON LOADING DATA

Spark has six “core” data sources and hundreds of external data sources written by the community. Spark’s core data sources are CSV, JSON, Parquet, ORC, JDBC/ODBC connections, Plain-text files

Spark has numerous community-created data sources such as Cassandra, HBase, MongoDB, AWS Redshift, XML

The foundation for reading data in Spark is the DataFrameReader. We access this through the SparkSession via the read attribute. After we have a DataFrame reader, we specify several values: format, schema, read mode, other options

The format, options, and schema each return a DataFrameReader that can undergo further transformations and are all optional, except for one option i.e. the path

In [5]:
#Loading Test Data
df = spark.read.format('text').options(header='true', inferSchema='true').load("StockData/ETFs/aadr.us.txt")

In [6]:
#Loading Test Data
df_new = spark.read.format('text').options(header='true', inferSchema='true').load("StockData/ETFs/aadr.us.txt")

In [7]:
df.take(3)

[Row(value=u'Date,Open,High,Low,Close,Volume,OpenInt'),
 Row(value=u'2010-07-21,24.333,24.333,23.946,23.946,43321,0'),
 Row(value=u'2010-07-22,24.644,24.644,24.362,24.487,18031,0')]

In [8]:
df.columns

['value']

In [9]:
display(df)

DataFrame[value: string]

In [10]:
#spark.stop()

In [11]:
df_pq = spark.read.format("parquet")\
  .load("../dev/master.parquet")

In [None]:
#spark.stop()

Loading CSV File

In [None]:
df = spark.read.format('csv').options(header='true', inferSchema='true').load('abcnews-date-text.csv')

In [None]:
df.take(4)

DataFrames can also be created from scratch. First we need to create a schema and then we can create a DataFrame with manually created rows.

In [5]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [6]:
#Notice the Boolean value describes whether the field is nullable
testSchema = StructType([
  StructField("col1", StringType(), True),
  StructField("col2", StringType(), True)
])

In [7]:
myRow = Row("Hi", "There")
myDf = spark.createDataFrame([myRow], testSchema)
myDf.take(1)

Py4JJavaError: An error occurred while calling o65.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 214, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 163, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 454, in loads
    return pickle.loads(obj)
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 783, in _make_skel_func
    closure = _reconstruct_closure(closures) if closures else None
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 775, in _reconstruct_closure
    return tuple([_make_cell(v) for v in values])
TypeError: ("'int' object is not iterable", <function _make_skel_func at 0x10ad56398>, (<code object func at 0x1105f3530, file "/Users/pauldefusco/anaconda2/envs/py27/lib/python2.7/site-packages/pyspark/rdd.py", line 333>, 1, {}))

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 163, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 454, in loads
    return pickle.loads(obj)
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 783, in _make_skel_func
    closure = _reconstruct_closure(closures) if closures else None
  File "/Users/pauldefusco/Documents/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 775, in _reconstruct_closure
    return tuple([_make_cell(v) for v in values])
TypeError: ("'int' object is not iterable", <function _make_skel_func at 0x10ad56398>, (<code object func at 0x1105f3530, file "/Users/pauldefusco/anaconda2/envs/py27/lib/python2.7/site-packages/pyspark/rdd.py", line 333>, 1, {}))

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


Working with the Kaggle Flight Delays Dataset: https://www.kaggle.com/usdot/flight-delays/data

In [8]:
airlines = spark.read.format('csv').options(header='true', inferSchema='true').load('flight-delays/airlines.csv')

In [9]:
flights = spark.read.format('csv').options(header='true', inferSchema='true').load('flight-delays/flights.csv')

In [10]:
airports = spark.read.format('csv').options(header='true', inferSchema='true').load('flight-delays/airports.csv')

In [11]:
airlines.take(1)

[Row(IATA_CODE=u'UA', AIRLINE=u'United Air Lines Inc.')]

In [12]:
flights.take(1)

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE=u'AS', FLIGHT_NUMBER=98, TAIL_NUMBER=u'N407AS', ORIGIN_AIRPORT=u'ANC', DESTINATION_AIRPORT=u'SEA', SCHEDULED_DEPARTURE=5, DEPARTURE_TIME=2354, DEPARTURE_DELAY=-11, TAXI_OUT=21, WHEELS_OFF=15, SCHEDULED_TIME=205, ELAPSED_TIME=194, AIR_TIME=169, DISTANCE=1448, WHEELS_ON=404, TAXI_IN=4, SCHEDULED_ARRIVAL=430, ARRIVAL_TIME=408, ARRIVAL_DELAY=-22, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None)]

In [13]:
airports.take(1)

[Row(IATA_CODE=u'ABE', AIRPORT=u'Lehigh Valley International Airport', CITY=u'Allentown', STATE=u'PA', COUNTRY=u'USA', LATITUDE=40.65236, LONGITUDE=-75.4404)]

In [14]:
airlines.printSchema()
flights.printSchema()
airports.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable 

A schema is a StructType (Spark's complex types) made up of a number of fields, StructFields, that have a name, type, a Boolean flag which specifies whether that column can contain missing or null values, and, finally, users can optionally specify associated metadata with that column. If the types in the data (at runtime) do not match the schema, Spark will throw an error.

In [15]:
print airports.schema 
print '\n'
print flights.schema 
print '\n'
print airports.schema

StructType(List(StructField(IATA_CODE,StringType,true),StructField(AIRPORT,StringType,true),StructField(CITY,StringType,true),StructField(STATE,StringType,true),StructField(COUNTRY,StringType,true),StructField(LATITUDE,DoubleType,true),StructField(LONGITUDE,DoubleType,true)))


StructType(List(StructField(YEAR,IntegerType,true),StructField(MONTH,IntegerType,true),StructField(DAY,IntegerType,true),StructField(DAY_OF_WEEK,IntegerType,true),StructField(AIRLINE,StringType,true),StructField(FLIGHT_NUMBER,IntegerType,true),StructField(TAIL_NUMBER,StringType,true),StructField(ORIGIN_AIRPORT,StringType,true),StructField(DESTINATION_AIRPORT,StringType,true),StructField(SCHEDULED_DEPARTURE,IntegerType,true),StructField(DEPARTURE_TIME,IntegerType,true),StructField(DEPARTURE_DELAY,IntegerType,true),StructField(TAXI_OUT,IntegerType,true),StructField(WHEELS_OFF,IntegerType,true),StructField(SCHEDULED_TIME,IntegerType,true),StructField(ELAPSED_TIME,IntegerType,true),StructField(AIR_TIME,IntegerType,t

Now we will change the airlines dataframe schema by manually setting a new schema. We will only change column names to demonstrate the functionality

In [16]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [17]:
myManualSchema = StructType([
  StructField("AIRLINE_IATA_CODE", StringType(), True),
  StructField("AIRLINE_NAME", StringType(), True)
])

In [18]:
test_airlines = spark.read.format('csv')\
.options(header='true', inferSchema='true')\
.schema(myManualSchema)\
.load('flight-delays/airlines.csv')

In [19]:
test_airlines.printSchema()

root
 |-- AIRLINE_IATA_CODE: string (nullable = true)
 |-- AIRLINE_NAME: string (nullable = true)



Columns in Spark are similar to columns in a spreadsheet. You can select, manipulate, and remove columns from DataFrames with operations called expressions. To Spark, columns are logical constructions that simply represent a value computed on a per-record basis by means of an expression. This means that to have a real value for a column, we need to have a row; and to have a row, we need to have a DataFrame. You cannot manipulate an individual column outside the context of a DataFrame; you must use Spark transformations within a DataFrame to modify the contents of a column.

Columns are not resolved until we compare the column names with those we are maintaining in the catalog. Column and table resolution happens in the analyzer phase. If you need to refer to a specific DataFrame’s column, you can use the col method on the specific DataFrame.

An expression is a set of transformations on one or more values in a record in a DataFrame, like a function that takes as input one or more column names, resolves them, and then potentially applies more expressions to create a single value for each record in the dataset. This “single value” can actually be a complex type like a Map or Array.

n the simplest case, an expression, created via the expr function, is just a DataFrame column reference. In the simplest case, expr("someCol") is equivalent to col("someCol").

In [20]:
test_airlines.columns

['AIRLINE_IATA_CODE', 'AIRLINE_NAME']

In [21]:
test_airlines.first()

Row(AIRLINE_IATA_CODE=u'UA', AIRLINE_NAME=u'United Air Lines Inc.')

You can create rows by manually instantiating a Row object with the values that belong in each column. It’s important to note that only DataFrames have schemas. Rows themselves do not have schemas. This means that if you create a Row manually, you must specify the values in the same order as the schema of the DataFrame to which they might be appended

In [22]:
from pyspark.sql import Row
myRow = Row("DA","DreamAirlines")

In [23]:
print myRow[0]
print myRow[1]

DA
DreamAirlines


Views can be created on top of DataFrames in order to manipulate data with SQL

In [24]:
test_airlines.createOrReplaceTempView("test_airlines_view")

In [25]:
#in SQL
spark.sql("""SELECT AIRLINE_NAME, AIRLINE_IATA_CODE FROM test_airlines_view LIMIT 4""").take(4)

[Row(AIRLINE_NAME=u'United Air Lines Inc.', AIRLINE_IATA_CODE=u'UA'),
 Row(AIRLINE_NAME=u'American Airlines Inc.', AIRLINE_IATA_CODE=u'AA'),
 Row(AIRLINE_NAME=u'US Airways Inc.', AIRLINE_IATA_CODE=u'US'),
 Row(AIRLINE_NAME=u'Frontier Airlines Inc.', AIRLINE_IATA_CODE=u'F9')]

In [26]:
test_airlines.select("AIRLINE_NAME", "AIRLINE_IATA_CODE").take(4)

[Row(AIRLINE_NAME=u'United Air Lines Inc.', AIRLINE_IATA_CODE=u'UA'),
 Row(AIRLINE_NAME=u'American Airlines Inc.', AIRLINE_IATA_CODE=u'AA'),
 Row(AIRLINE_NAME=u'US Airways Inc.', AIRLINE_IATA_CODE=u'US'),
 Row(AIRLINE_NAME=u'Frontier Airlines Inc.', AIRLINE_IATA_CODE=u'F9')]

In [27]:
#More ways to select the same column
from pyspark.sql.functions import expr, col, column
test_airlines.select(
    expr("AIRLINE_NAME"),
    col("AIRLINE_NAME"),
    column("AIRLINE_NAME"))\
  .take(3)

[Row(AIRLINE_NAME=u'United Air Lines Inc.', AIRLINE_NAME=u'United Air Lines Inc.', AIRLINE_NAME=u'United Air Lines Inc.'),
 Row(AIRLINE_NAME=u'American Airlines Inc.', AIRLINE_NAME=u'American Airlines Inc.', AIRLINE_NAME=u'American Airlines Inc.'),
 Row(AIRLINE_NAME=u'US Airways Inc.', AIRLINE_NAME=u'US Airways Inc.', AIRLINE_NAME=u'US Airways Inc.')]

expr is the most flexible reference that we can use. It can refer to a plain column or a string manipulation of a column. To illustrate, let’s change the column name, and then change it back by using the AS keyword and then the alias method on the column

In [28]:
test_airlines.select(expr("AIRLINE_NAME AS airline")).take(2)

[Row(airline=u'United Air Lines Inc.'), Row(airline=u'American Airlines Inc.')]

In [29]:
test_airlines.select(expr("AIRLINE_NAME")\
                         .alias("airline"))\
                         .take(2)

[Row(airline=u'United Air Lines Inc.'), Row(airline=u'American Airlines Inc.')]

You can only use one expr at a time (unless chaining) so when you need to select multiple columns via expression use selectExpr

In [30]:
test_airlines.selectExpr("AIRLINE_NAME","AIRLINE_IATA_CODE")\
                         .take(2)

[Row(AIRLINE_NAME=u'United Air Lines Inc.', AIRLINE_IATA_CODE=u'UA'),
 Row(AIRLINE_NAME=u'American Airlines Inc.', AIRLINE_IATA_CODE=u'AA')]

Sometimes, we need to pass explicit values into Spark that are just a value (rather than a new column). This might be a constant value or something we’ll need to compare to later on. The way we do this is through literals. This is basically a translation from a given programming language’s literal value to one that Spark understands. Literals are expressions and you can use them in the same way:

In [31]:
#This adds the constant "One" with value 1
from pyspark.sql.functions import lit
test_airlines.select(expr("*"), lit(1).alias("One")).take(2)

[Row(AIRLINE_IATA_CODE=u'UA', AIRLINE_NAME=u'United Air Lines Inc.', One=1),
 Row(AIRLINE_IATA_CODE=u'AA', AIRLINE_NAME=u'American Airlines Inc.', One=1)]

Adding Columns: use the withColumn method

In [32]:
test_airlines.withColumn("numberOne", lit(1)).take(2)

[Row(AIRLINE_IATA_CODE=u'UA', AIRLINE_NAME=u'United Air Lines Inc.', numberOne=1),
 Row(AIRLINE_IATA_CODE=u'AA', AIRLINE_NAME=u'American Airlines Inc.', numberOne=1)]

Now setting the new column "flag" to 1 based on expr condition: 

In [33]:
test_airlines = test_airlines.withColumn("flag", expr("AIRLINE_IATA_CODE == 'UA'"))

Use the withColumnRenamed method to rename columns

In [34]:
test_airlines = test_airlines.withColumnRenamed("flag", "UA_FLAG")

In [35]:
test_airlines.columns

['AIRLINE_IATA_CODE', 'AIRLINE_NAME', 'UA_FLAG']

Use drop to delete one or multiple columns from a DataFrame

In [36]:
test_airlines.drop("AIRLINE_NAME").columns

['AIRLINE_IATA_CODE', 'UA_FLAG']

In [37]:
test_airlines.drop("AIRLINE_NAME","AIRLINE_IATA_CODE").columns

['UA_FLAG']

The withColumn method can also be used to change datatypes

In [38]:
test_airlines.withColumn("UA_FLAG_FLOAT",col("UA_FLAG").cast("long"))

DataFrame[AIRLINE_IATA_CODE: string, AIRLINE_NAME: string, UA_FLAG: boolean, UA_FLAG_FLOAT: bigint]

Filtering Rows: you can use where or filter and they both will perform the same operation and accept the same argument types when used with DataFrames

In [45]:
print test_airlines.filter(col("UA_FLAG") ==1).take(10)
print test_airlines.where("UA_FLAG == 1").take(10)

[Row(AIRLINE_IATA_CODE=u'UA', AIRLINE_NAME=u'United Air Lines Inc.', UA_FLAG=True)]
[Row(AIRLINE_IATA_CODE=u'UA', AIRLINE_NAME=u'United Air Lines Inc.', UA_FLAG=True)]


Spark automatically performs all filtering operations at the same time regardless of the filter ordering. This means that if you want to specify multiple AND filters, just chain them sequentially and let Spark handle the rest

In [49]:
test_airlines.where(col("UA_FLAG") == 1)\
    .where(col("AIRLINE_IATA_CODE") == "UA")\
    .take(2)

[Row(AIRLINE_IATA_CODE=u'UA', AIRLINE_NAME=u'United Air Lines Inc.', UA_FLAG=True)]

Unique Rows

In [50]:
print test_airlines.select("UA_FLAG", "AIRLINE_IATA_CODE").distinct().count()

14


In [51]:
#Sampling
seed = 5
withReplacement = False
fraction = 0.5
test_airlines.sample(withReplacement, fraction, seed).count()

6

Random splits can be helpful when you need to break up your DataFrame into a random “splits” of the original DataFrame. This is often used with machine learning algorithms to create training, validation, and test sets.

In [53]:
dataFrames = test_airlines.randomSplit([0.25, 0.75], seed)

In [54]:
dataFrames[0].take(2)

[Row(AIRLINE_IATA_CODE=u'AA', AIRLINE_NAME=u'American Airlines Inc.', UA_FLAG=False),
 Row(AIRLINE_IATA_CODE=u'MQ', AIRLINE_NAME=u'American Eagle Airlines Inc.', UA_FLAG=False)]

In [55]:
dataFrames[1].take(2)

[Row(AIRLINE_IATA_CODE=u'AS', AIRLINE_NAME=u'Alaska Airlines Inc.', UA_FLAG=False),
 Row(AIRLINE_IATA_CODE=u'B6', AIRLINE_NAME=u'JetBlue Airways', UA_FLAG=False)]

In [56]:
dataFrames[0].count() > dataFrames[1].count()

False

In [59]:
print type(dataFrames)
print type(dataFrames[0])

<type 'list'>
<class 'pyspark.sql.dataframe.DataFrame'>


DataFrames are immutable. This means users cannot append to DataFrames because that would be changing it. To append to a DataFrame, you must union the original DataFrame along with the new DataFrame. This just concatenates the two DataFramess. To union two DataFrames, you must be sure that they have the same schema and number of columns; otherwise, the union will fail.

The airlines and test_airlines dataframes now have similar schemas so we will modify them to have the same schema and then join them

In [60]:
airlines.printSchema()
test_airlines.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)

root
 |-- AIRLINE_IATA_CODE: string (nullable = true)
 |-- AIRLINE_NAME: string (nullable = true)
 |-- UA_FLAG: boolean (nullable = true)



In [63]:
test_airlines = test_airlines.drop("UA_FLAG")\
    .withColumnRenamed('AIRLINE_IATA_CODE','IATA_CODE')\
    .withColumnRenamed('AIRLINE_NAME','AIRLINE')

In [65]:
test_airlines.printSchema() == airlines.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)



True

In [77]:
#Notice this union will return the same row twice
test_airlines.union(airlines)\
    .where(col("IATA_CODE") == "AA")\
    .take(3)

[Row(IATA_CODE=u'AA', AIRLINE=u'American Airlines Inc.'),
 Row(IATA_CODE=u'AA', AIRLINE=u'American Airlines Inc.')]

Sorting

In [81]:
test_airlines.sort("IATA_CODE").take(5)

[Row(IATA_CODE=u'AA', AIRLINE=u'American Airlines Inc.'),
 Row(IATA_CODE=u'AS', AIRLINE=u'Alaska Airlines Inc.'),
 Row(IATA_CODE=u'B6', AIRLINE=u'JetBlue Airways'),
 Row(IATA_CODE=u'DL', AIRLINE=u'Delta Air Lines Inc.'),
 Row(IATA_CODE=u'EV', AIRLINE=u'Atlantic Southeast Airlines')]

We can also sort on multiple columns although in this case it won't make a differene

In [82]:
test_airlines.orderBy("IATA_CODE", "AIRLINE").take(5)

[Row(IATA_CODE=u'AA', AIRLINE=u'American Airlines Inc.'),
 Row(IATA_CODE=u'AS', AIRLINE=u'Alaska Airlines Inc.'),
 Row(IATA_CODE=u'B6', AIRLINE=u'JetBlue Airways'),
 Row(IATA_CODE=u'DL', AIRLINE=u'Delta Air Lines Inc.'),
 Row(IATA_CODE=u'EV', AIRLINE=u'Atlantic Southeast Airlines')]

In [83]:
test_airlines.orderBy(col("IATA_CODE"), col("AIRLINE")).take(5)

[Row(IATA_CODE=u'AA', AIRLINE=u'American Airlines Inc.'),
 Row(IATA_CODE=u'AS', AIRLINE=u'Alaska Airlines Inc.'),
 Row(IATA_CODE=u'B6', AIRLINE=u'JetBlue Airways'),
 Row(IATA_CODE=u'DL', AIRLINE=u'Delta Air Lines Inc.'),
 Row(IATA_CODE=u'EV', AIRLINE=u'Atlantic Southeast Airlines')]

ASC and DESC arguments can also be applied

In [85]:
from pyspark.sql.functions import desc, asc
test_airlines.orderBy(expr("IATA_CODE")).take(2)

[Row(IATA_CODE=u'AA', AIRLINE=u'American Airlines Inc.'),
 Row(IATA_CODE=u'AS', AIRLINE=u'Alaska Airlines Inc.')]

In [86]:
test_airlines.orderBy(col("IATA_CODE").desc(), col("AIRLINE").asc()).take(2)

[Row(IATA_CODE=u'WN', AIRLINE=u'Southwest Airlines Co.'),
 Row(IATA_CODE=u'VX', AIRLINE=u'Virgin America')]

For optimization purposes, it’s sometimes advisable to sort within each partition before another set of transformations. You can use the sortWithinPartitions method to do this

In [89]:
test_airlines.sortWithinPartitions("IATA_CODE")

DataFrame[IATA_CODE: string, AIRLINE: string]

The Limit method effectively does the same thing as the take method

In [93]:
test_airlines.orderBy(expr("IATA_CODE")).limit(3).take(3)

[Row(IATA_CODE=u'AA', AIRLINE=u'American Airlines Inc.'),
 Row(IATA_CODE=u'AS', AIRLINE=u'Alaska Airlines Inc.'),
 Row(IATA_CODE=u'B6', AIRLINE=u'JetBlue Airways')]

Another important optimization opportunity is to partition the data according to some frequently filtered columns, which control the physical layout of data across the cluster including the partitioning scheme and the number of partitions.

Repartition will incur a full shuffle of the data, regardless of whether one is necessary. This means that you should typically only repartition when the future number of partitions is greater than your current number of partitions or when you are looking to partition by a set of columns

In [94]:
test_airlines.rdd.getNumPartitions()

1

If you know that you’re going to be filtering by a certain column often, it can be worth repartitioning based on that column

In [95]:
test_airlines.repartition(col("IATA_CODE"))

DataFrame[IATA_CODE: string, AIRLINE: string]

Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions. This operation will shuffle your data into 2 partitions based on the destination country name, and then coalesce them (without a full shuffle)

In [97]:
test_airlines.repartition(1, col("IATA_CODE")).coalesce(2)

DataFrame[IATA_CODE: string, AIRLINE: string]

In [98]:
spark.stop()