In [1]:
%matplotlib inline
import matplotlib
import seaborn as sns
matplotlib.rcParams['savefig.dpi'] = 144

In [2]:
import random
import toolz
from pyspark import SparkContext, SparkConf

In [3]:
from pyspark import SparkContext
sc = SparkContext("local[*]", "pyspark_df")
print sc.version

# Alternatively...
# conf = SparkConf().setAppName("pyspark_df").setMaster("local[*]")
# sc = SparkContext(conf=conf)

2.0.1


In [4]:
# needed to convert RDDs into DataFrames
from pyspark.sql import SQLContext
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.types import DoubleType
sqlContext = SQLContext(sc)

# DataFrames

## Motivation and Spark SQL

Spark SQL is the current effort to provide support for writing SQL queries in Spark. Newer versions support Hive, Parquet, and other data sources. [Docs](http://spark.apache.org/docs/latest/sql-programming-guide.html)

**The key feature of Spark SQL is the use of DataFrames instead of RDDs.** A DataFrame is a distributed collection of data organized into named columns, and operations on DataFrames are first parsed through an optimized execution engine which streamlines and may even reorder the request to optimize execution. The keyword to search here is Catalyst.

Under the hood, operations on DataFrames are boiled down to operations on RDDs, but the RDDs are created by the execution engine, and not directly by the user. It is also possible to convert RDDs to DataFrames and vice versa.

The Spark ML package, unlike MLlib, uses DataFrames as inputs and outputs.

**Question:** What is an example of a "bad" sequence of operations which should be reordered for optimal performance?

DataFrames are...

* Immutable, like RDDs
* Lineage is remembered, like RDDs (resiliency)
* Lazy execution, like RDDs
* So why do we care?


DataFrames are an abstraction that lets us think of data in a familiar form (Panda, data.frame, SQL table, etc.).

We can use a similar API to RDDs!

Access to SQL-like optimizations and cost analysis due to it being in a columnar format.

What about type safety?

What are these UDF things?

In [6]:
data = sc.parallelize(xrange(1,10001)) \
         .map(lambda x: (random.random(), random.random()))
print data.take(10)

[(0.9412996385399084, 0.9182080884426133), (0.47911987575213577, 0.7332576191503406), (0.7175711944905967, 0.1233618721391534), (0.7963727228092518, 0.9482954015193551), (0.10048408418147647, 0.3943334445934439), (0.7171379373255168, 0.09663276618523964), (0.4003906924505737, 0.38269793860484813), (0.20947451015299057, 0.9446317056210931), (0.5623255369039984, 0.0625380569007894), (0.1641786776098879, 0.9861059813313088)]


In [7]:
df = data.toDF()
# Note: this isn't always so easy, you may need to explicitly specify a schema

In [8]:
type(df)

pyspark.sql.dataframe.DataFrame

In [9]:
df.printSchema()

root
 |-- _1: double (nullable = true)
 |-- _2: double (nullable = true)



In [10]:
df = df.withColumnRenamed("_1", "x").withColumnRenamed("_2", "y")
df.write.save("parquet_demo_pyspark", format="parquet")
# Another (older) syntax
# df.write.parquet("file:///home/vagrant/datacourse/module5/demo")

Try rerunning the above cell.

Save modes:
* error
* append
* overwrite
* ignore (ie. CREATE TABLE IF NOT EXISTS)

In [11]:
df.write.mode("ignore").parquet("parquet_demo_pyspark")

In [12]:
dfp = sqlContext.read.parquet("parquet_demo_pyspark")

In [13]:
dfp.describe("x").show()

+-------+-------------------+
|summary|                  x|
+-------+-------------------+
|  count|              10000|
|   mean| 0.4842625005668251|
| stddev| 0.2916608175852261|
|    min|1.25061073565913E-4|
|    max| 0.9991741902024115|
+-------+-------------------+



In [14]:
filtered_dfp = dfp.filter(dfp["x"] < 0.5)

In [26]:
filtered_dfp.groupby('x').count().show()

+--------------------+-----+
|                   x|count|
+--------------------+-----+
|  0.1786634841158754|    8|
|  0.4431279963842084|    8|
|  0.4722867526068014|    8|
|0.022739968805116972|    8|
| 0.39251107141111063|    8|
|  0.1887173115309988|    8|
| 0.10048408418147647|    8|
| 0.48974282943224756|    8|
| 0.17034516931219545|    8|
| 0.24662161774316504|    8|
| 0.19007598399577674|    8|
|  0.4421469152413511|    8|
| 0.19878832282792658|    8|
| 0.39902255494181316|    8|
|  0.3859785306455894|    8|
| 0.30458693133339176|    8|
|0.014007850951342604|    8|
|  0.4298315917111316|    8|
|  0.4400508508607254|    8|
|  0.3622558171995074|    8|
+--------------------+-----+
only showing top 20 rows



## Exploring the Catalyst Optimzer

In [16]:
filtered_dfp.explain(True)

== Parsed Logical Plan ==
Filter (x#22 < 0.5)
+- Relation[x#22,y#23] parquet

== Analyzed Logical Plan ==
x: double, y: double
Filter (x#22 < 0.5)
+- Relation[x#22,y#23] parquet

== Optimized Logical Plan ==
Filter (isnotnull(x#22) && (x#22 < 0.5))
+- Relation[x#22,y#23] parquet

== Physical Plan ==
*Project [x#22, y#23]
+- *Filter (isnotnull(x#22) && (x#22 < 0.5))
   +- *BatchedScan parquet [x#22,y#23] Format: ParquetFormat, InputPaths: hdfs://localhost/user/vagrant/parquet_demo_pyspark, PartitionFilters: [], PushedFilters: [IsNotNull(x), LessThan(x,0.5)], ReadSchema: struct<x:double,y:double>


In [17]:
filtered_df = df.filter(df["x"] < 0.5)

In [18]:
filtered_df.explain(True)

== Parsed Logical Plan ==
Filter (x#5 < 0.5)
+- Project [x#5, _2#1 AS y#9]
   +- Project [_1#0 AS x#5, _2#1]
      +- LogicalRDD [_1#0, _2#1]

== Analyzed Logical Plan ==
x: double, y: double
Filter (x#5 < 0.5)
+- Project [x#5, _2#1 AS y#9]
   +- Project [_1#0 AS x#5, _2#1]
      +- LogicalRDD [_1#0, _2#1]

== Optimized Logical Plan ==
Project [_1#0 AS x#5, _2#1 AS y#9]
+- Filter (isnotnull(_1#0) && (_1#0 < 0.5))
   +- LogicalRDD [_1#0, _2#1]

== Physical Plan ==
*Project [_1#0 AS x#5, _2#1 AS y#9]
+- *Filter (isnotnull(_1#0) && (_1#0 < 0.5))
   +- Scan ExistingRDD[_1#0,_2#1]


In [19]:
filtered_df = df.filter(df["x"] < 0.5).filter(df["y"] < 0.5)

In [20]:
filtered_df.explain(True)

== Parsed Logical Plan ==
Filter (y#9 < 0.5)
+- Filter (x#5 < 0.5)
   +- Project [x#5, _2#1 AS y#9]
      +- Project [_1#0 AS x#5, _2#1]
         +- LogicalRDD [_1#0, _2#1]

== Analyzed Logical Plan ==
x: double, y: double
Filter (y#9 < 0.5)
+- Filter (x#5 < 0.5)
   +- Project [x#5, _2#1 AS y#9]
      +- Project [_1#0 AS x#5, _2#1]
         +- LogicalRDD [_1#0, _2#1]

== Optimized Logical Plan ==
Project [_1#0 AS x#5, _2#1 AS y#9]
+- Filter (((isnotnull(_1#0) && (_1#0 < 0.5)) && isnotnull(_2#1)) && (_2#1 < 0.5))
   +- LogicalRDD [_1#0, _2#1]

== Physical Plan ==
*Project [_1#0 AS x#5, _2#1 AS y#9]
+- *Filter (((isnotnull(_1#0) && (_1#0 < 0.5)) && isnotnull(_2#1)) && (_2#1 < 0.5))
   +- Scan ExistingRDD[_1#0,_2#1]


In [21]:
filtered_dfp = dfp.filter(dfp["x"] < 0.5).filter(dfp["y"] < 0.5)

In [22]:
filtered_dfp.explain(True)

== Parsed Logical Plan ==
Filter (y#23 < 0.5)
+- Filter (x#22 < 0.5)
   +- Relation[x#22,y#23] parquet

== Analyzed Logical Plan ==
x: double, y: double
Filter (y#23 < 0.5)
+- Filter (x#22 < 0.5)
   +- Relation[x#22,y#23] parquet

== Optimized Logical Plan ==
Filter (((isnotnull(x#22) && (x#22 < 0.5)) && isnotnull(y#23)) && (y#23 < 0.5))
+- Relation[x#22,y#23] parquet

== Physical Plan ==
*Project [x#22, y#23]
+- *Filter (((isnotnull(x#22) && (x#22 < 0.5)) && isnotnull(y#23)) && (y#23 < 0.5))
   +- *BatchedScan parquet [x#22,y#23] Format: ParquetFormat, InputPaths: hdfs://localhost/user/vagrant/parquet_demo_pyspark, PartitionFilters: [], PushedFilters: [IsNotNull(x), LessThan(x,0.5), IsNotNull(y), LessThan(y,0.5)], ReadSchema: struct<x:double,y:double>


Under the hood, it's just manipulating trees based on rules.
The introductory [blog post](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html) has good pictures.

### Project Tungsten

* Memory management and GC (better than the JVM)
* Cache-aware computation
* Codegen (compile queries into Java bytecode)

Cache-aware computation example:
* Case 1: pointer -> key, value
* Case 2: ke, pointer -> key, value

The CPU has to find keys for sort purposes. This helps it find them faster.

[More](https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html)

### DataFrame performance and tuning

See [here](http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning) for details.

## SQL and DataFrames

In [23]:
# Requires Hive to permanently store tables
df.registerTempTable('nums')  # This is NOT the same as a temp table in SQL proper
sql_df = sqlContext.sql("select x, y from nums where y > 0.9 limit 3")
sql_df.show()

+-------------------+------------------+
|                  x|                 y|
+-------------------+------------------+
| 0.9412996385399084|0.9182080884426133|
| 0.7963727228092518|0.9482954015193551|
|0.20947451015299057|0.9446317056210931|
+-------------------+------------------+



In [24]:
sql_df.explain(True)

== Parsed Logical Plan ==
'GlobalLimit 3
+- 'LocalLimit 3
   +- 'Project ['x, 'y]
      +- 'Filter ('y > 0.9)
         +- 'UnresolvedRelation `nums`

== Analyzed Logical Plan ==
x: double, y: double
GlobalLimit 3
+- LocalLimit 3
   +- Project [x#5, y#9]
      +- Filter (y#9 > cast(0.9 as double))
         +- SubqueryAlias nums
            +- Project [x#5, _2#1 AS y#9]
               +- Project [_1#0 AS x#5, _2#1]
                  +- LogicalRDD [_1#0, _2#1]

== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Project [_1#0 AS x#5, _2#1 AS y#9]
      +- Filter (isnotnull(_2#1) && (_2#1 > 0.9))
         +- LogicalRDD [_1#0, _2#1]

== Physical Plan ==
CollectLimit 3
+- *Project [_1#0 AS x#5, _2#1 AS y#9]
   +- *Filter (isnotnull(_2#1) && (_2#1 > 0.9))
      +- Scan ExistingRDD[_1#0,_2#1]


*Reminder:* Check the UI (port 4040 by default) for tables in memory.

*Reminder:* A number of interactive tutorials are available on the DataBricks [community cloud](https://community.cloud.databricks.com). I highly recommend making an account and checking out the guide.

This is also a good place to learn about connecting to databases like Cassandra or using JDBC protocol.

## Adding columns and functions

Because DataFrames are immutable, adding new information means appending columns to an existing DataFrame.

In [27]:
# Currying lets us specify some of a function's arguments and delay specifying the rest until later.

@toolz.curry
def prediction(threshold, val):
    if val > threshold:
        return 1.0
    else:
        return 0.0

In [28]:
x_labelizer = udf(prediction(0.5), DoubleType())
y_labelizer = udf(prediction(0.9), DoubleType())

In [29]:
new_df = dfp.withColumn("x_label", x_labelizer("x")).withColumn("y_label", y_labelizer("y"))

In [30]:
new_df.show()

+--------------------+-------------------+-------+-------+
|                   x|                  y|x_label|y_label|
+--------------------+-------------------+-------+-------+
|  0.9412996385399084| 0.9182080884426133|    1.0|    1.0|
| 0.47911987575213577| 0.7332576191503406|    0.0|    0.0|
|  0.7175711944905967| 0.1233618721391534|    1.0|    0.0|
|  0.7963727228092518| 0.9482954015193551|    1.0|    1.0|
| 0.10048408418147647| 0.3943334445934439|    0.0|    0.0|
|  0.7171379373255168|0.09663276618523964|    1.0|    0.0|
|  0.4003906924505737|0.38269793860484813|    0.0|    0.0|
| 0.20947451015299057| 0.9446317056210931|    0.0|    1.0|
|  0.5623255369039984| 0.0625380569007894|    1.0|    0.0|
|  0.1641786776098879| 0.9861059813313088|    0.0|    1.0|
|  0.4065774604450405|0.13000681340291398|    0.0|    0.0|
| 0.09012319588635609|0.39413725067256633|    0.0|    0.0|
|0.002362154337117...| 0.9792537470089171|    0.0|    1.0|
|  0.5377244513497503| 0.8287849244800605|    1.0|    0.

## Type safety and Datasets

In [31]:
rdd = new_df.rdd
row = rdd.take(1)
row

[Row(x=0.9412996385399084, y=0.9182080884426133, x_label=1.0, y_label=1.0)]

In [32]:
# Remember that take always returns a list of results
print type(row)

<type 'list'>


In [33]:
row = row[0]
print type(row)

<class 'pyspark.sql.types.Row'>


In Python, we're not too worried about type safety. But it's important to note that in Scala/Java, these Row objects do not contain the type information of the objects inside them and therefore type safety can be lost converting from RDDs to DataFrames. [DataSets](http://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes) (fleshed out in Spark 2.0) are a newer incarnation of DataFrames that add encoding information to preserve that type safety.We can, however, drill into Row objects to extract the information we want.

We can, however, drill into Row objects to extract the information we want.

In [34]:
row[1]

0.9182080884426133

In [35]:
row.asDict()

{'x': 0.9412996385399084,
 'x_label': 1.0,
 'y': 0.9182080884426133,
 'y_label': 1.0}

*Copyright &copy; 2016 The Data Incubator.  All rights reserved.*