In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Spark SQL

## Loading & Saving Data

### Local File System

- Specify 
```
sc.textFile(file:///path)
```

- The filesystem should be available at the same path on all nodes in the cluster for both the master and executors
- If the file is not on all nodes in the cluster, load it locally and then call parallelize to distribute the contents to workers 

### Amazon S3

- S3 (Simple Storage Service) – Web Storage	Service

In [4]:
txt = sc.textFile('s3n://usfca-msan694/README.md')

In [5]:
txt

s3n://usfca-msan694/README.md MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [7]:
txt.collect()[:5]

[u'# Apache Spark',
 u'',
 u'Spark is a fast and general cluster computing system for Big Data. It provides',
 u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 u'supports general computation graphs for data analysis. It also supports a']

### HDFS 

- Designed to work on commodity hardware and be resilient to node failure, while providing high data throughput
- Specify "hdfs://master:port/path" for input and output data

## Spark SQL

### DataFrame

- Handle structured, distributed data with a table-like representation with named column declared with column types
- `RDD`: low-level and direct way of manipulating data in Spark. Do not have to be structured 
- Special case of DataSet type
- Can join, query and save DataFrames
- SparkSQL lets you register DataFrames from different sources (SQL, Parquet, JSON, ORC) as tables in the table catalog and query them. 

#### Create DataFrames

- **Convert existing RDDs** - Load data as text, parse the lines and identify elements
    1. Use RDDs containing row data as tuples 
    2. Specify scheme using `createDataFrame`
        - Use `createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)`

#### 1. Use RDDs containing row data as tuples 
- cannot speicify all schema attributes (e.g. nullable or not)
- convert the RDD element arrays to tuples and call **`toDF`**  on the resulting RDD
- can specify column names when calling the `toDF` method
- all the columns are string type and nullable


### Example 2

- Load the italianPosts.csv parsed with ”~” into an RDD.
- Create DataFrame, using RDDs containing row data as tuples

In [7]:
# Step 1. Load the italianPosts.csv parsed with ”~” into an RDD.
# data = sc.textFile('s3n://usfca-msan694/Italian_Stack_Exchange/italianPosts.csv')
data = sc.textFile('./data/Italian_Stack_Exchange/italianPosts.csv')
data = data.map(lambda x: x.split("~"))

In [24]:
# Step 2. Create DataFrame, using RDDs containing row data as tuples:
dataRDD = data.map(lambda x: tuple([x[i] for i in range(len(x))]))

In [25]:
# HAVE TO SHUT DOWN ALL OTHER NOTEBOOOKS THAT ARE RUNNING
dataRDD.toDF()

DataFrame[_1: string, _2: string, _3: string, _4: string, _5: string, _6: string, _7: string, _8: string, _9: string, _10: string, _11: string, _12: string, _13: string]

In [34]:
dataRDD.toDF().show(n = 3)

+---+--------------------+---+--------------------+---+--------------------+----+--------------------+-------------------+----+----+---+----+
| _1|                  _2| _3|                  _4| _5|                  _6|  _7|                  _8|                 _9| _10| _11|_12| _13|
+---+--------------------+---+--------------------+---+--------------------+----+--------------------+-------------------+----+----+---+----+
|  4|2013-11-11 18:21:...| 17|&lt;p&gt;The infi...| 23|2013-11-10 19:37:...|null|                    |                   |null|null|  2|1165|
|  5|2013-11-10 20:31:...| 12|&lt;p&gt;Come cre...|  1|2013-11-10 19:44:...|  61|Cosa sapreste dir...|&lt;word-choice&gt;|   1|null|  1|1166|
|  2|2013-11-10 20:31:...| 17|&lt;p&gt;Il verbo...|  5|2013-11-10 19:58:...|null|                    |                   |null|null|  2|1167|
+---+--------------------+---+--------------------+---+--------------------+----+--------------------+-------------------+----+----+---+----+
only s

In [33]:
itPostDf = dataRDD.toDF(["commentCount", "lastActivityDate", "ownerUserId", 
     "body", "score", "creationDate", "viewCount", "title", "tags", 
     "answerCount", "acceptedAnswerId", "postTypeId", "id"])

itPostDf.show(n=3)

+------------+--------------------+-----------+--------------------+-----+--------------------+---------+--------------------+-------------------+-----------+----------------+----------+----+
|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|               title|               tags|answerCount|acceptedAnswerId|postTypeId|  id|
+------------+--------------------+-----------+--------------------+-----+--------------------+---------+--------------------+-------------------+-----------+----------------+----------+----+
|           4|2013-11-11 18:21:...|         17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     null|                    |                   |       null|            null|         2|1165|
|           5|2013-11-10 20:31:...|         12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61|Cosa sapreste dir...|&lt;word-choice&gt;|          1|            null|         1|1166|
|           2|2013-11-10 20:31:...|     

In [32]:
itPostDf.printSchema()

root
 |-- commentCount: string (nullable = true)
 |-- lastActivityDate: string (nullable = true)
 |-- ownerUserId: string (nullable = true)
 |-- body: string (nullable = true)
 |-- score: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- viewCount: string (nullable = true)
 |-- title: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- answerCount: string (nullable = true)
 |-- acceptedAnswerId: string (nullable = true)
 |-- postTypeId: string (nullable = true)
 |-- id: string (nullable = true)



### 2. Specify scheme using `createDataFrame`

- Format: 
```python 
createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
```
    + `data`: an RDD of row/tuple/list/dict, list or pd.DataFrame. e.g. 
    ```python
    from pyspark.sql import Row
    Row(val1, val2, val3)
    ```
    + `schema`: a `StructType` or list of column names, cannot be None, **REQUIRED**
    + `samplingRatio`: sample ratio of rows used for inferring the schema
    + `verifySchema`: verify data type of very row against schema


- **`schema`**

    - Example
    ```python
    from pyspark.sql.types import *
    schema = StructType([StructField("name",  StringType(), True), 
                        StructField("age", IntegerType(), True)])
    ```

    - StructType : consisting of a list of StructField
    - StructField: including 
        + **column name** (string), 
        + data type
        + nullable (default: True), 
        + metadata(default: None)
    - data type: `NullType(), StringType(), BinaryType(), BooleanType(), DateType(), TimestampType(), DoubleType(), FloatType(), IntegerType()`, etc.


### Example 3 
Create DataFrame, specifying a schema using `createDataFrame`

In [198]:
# Step 1. Load the italianPosts.csv parsed with ”~” into an RDD.
# data = sc.textFile('s3n://usfca-msan694/Italian_Stack_Exchange/italianPosts.csv')
data = sc.textFile('./data/Italian_Stack_Exchange/italianPosts.csv')
# data = data.map(lambda x: x.split("~"))

In [200]:
from pyspark.sql import Row
from pyspark.sql.types import *
from datetime import datetime

In [201]:
def toIntSafe(inval):
  try: return int(inval)
  except ValueError: return None

def toTimeSafe(inval):
  try: return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S.%f")
  except ValueError: return None

def toLongSafe(inval):
  try: return long(inval)
  except ValueError: return None

In [202]:
def stringToPost(row):
  r = row.encode('utf8').split("~")
  return Row(
    toIntSafe(r[0]),
    toTimeSafe(r[1]),
    toIntSafe(r[2]),
    r[3],
    toIntSafe(r[4]),
    toTimeSafe(r[5]),
    toIntSafe(r[6]),
    toIntSafe(r[7]),
    r[8],
    toIntSafe(r[9]),
    toLongSafe(r[10]),
    toLongSafe(r[11]),
    long(r[12]))

In [203]:
# Step 3: Define a StructType - list of columns
postSchema = StructType([
    StructField("commentCount", IntegerType(), True), 
    StructField("lastActivityDate", TimestampType(), True), 
    StructField("ownerUserId", LongType(), True),
    StructField("body", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("creationDate", TimestampType(), True),
    StructField("viewCount", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("answerCount", IntegerType(), True),
    StructField("acceptedAnswerId", LongType(), True),
    StructField("postTypeId", LongType(), True),
    StructField("id", LongType(), False) # id cannot be null

])

In [204]:
# Step 4: Convert input data
rowRDD = data.map(lambda x: stringToPost(x))

In [205]:
# Step 5: Create DataFrame with Schema
sqlContext = SQLContext(sc)
itPostDF = sqlContext.createDataFrame(rowRDD, postSchema)

In [206]:
itPostDF.show(n=3)

+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+-------------------+-----------+----------------+----------+----+
|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|title|               tags|answerCount|acceptedAnswerId|postTypeId|  id|
+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+-------------------+-----------+----------------+----------+----+
|           4|2013-11-11 18:21:...|         17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     null| null|                   |       null|            null|         2|1165|
|           5|2013-11-10 20:31:...|         12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61| null|&lt;word-choice&gt;|          1|            null|         1|1166|
|           2|2013-11-10 20:31:...|         17|&lt;p&gt;Il verbo...|    5|2013-11-10 19:58:...|     null| null|    

In [207]:
itPostDF.printSchema()

root
 |-- commentCount: integer (nullable = true)
 |-- lastActivityDate: timestamp (nullable = true)
 |-- ownerUserId: long (nullable = true)
 |-- body: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- creationDate: timestamp (nullable = true)
 |-- viewCount: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- answerCount: integer (nullable = true)
 |-- acceptedAnswerId: long (nullable = true)
 |-- postTypeId: long (nullable = true)
 |-- id: long (nullable = false)



### DataFrame API Basics

### Basic APIs
- `select(<column_names>)` or `select(<list of column_names>)`
- `drop()`
- `filter()`, `where()`
- `withColumnRenamed(old_name, new_name)`
- `withColumn()` - rename and add columns
- `orderBy()`, `sort()`
- Check out: 	https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

In [73]:
# select()
itPostDF.select("id", "body").show(n=10)

+----+--------------------+
|  id|                body|
+----+--------------------+
|1165|&lt;p&gt;The infi...|
|1166|&lt;p&gt;Come cre...|
|1167|&lt;p&gt;Il verbo...|
|1168|&lt;p&gt;As part ...|
|1169|&lt;p&gt;&lt;em&g...|
|1170|&lt;p&gt;There's ...|
|1171|&lt;p&gt;As other...|
|1172|&lt;p&gt;The expr...|
|1173|&lt;p&gt;When I w...|
|1174|&lt;p&gt;Wow, wha...|
+----+--------------------+
only showing top 10 rows



In [80]:
# drop() - select all but one/some
itPostDF.drop("lastActivityDate", "creationDate", "tags", "postTypeId", "acceptedAnswerId").show(n=10)

+------------+-----------+--------------------+-----+---------+-----+-----------+----+
|commentCount|ownerUserId|                body|score|viewCount|title|answerCount|  id|
+------------+-----------+--------------------+-----+---------+-----+-----------+----+
|           4|         17|&lt;p&gt;The infi...|   23|     null| null|       null|1165|
|           5|         12|&lt;p&gt;Come cre...|    1|       61| null|          1|1166|
|           2|         17|&lt;p&gt;Il verbo...|    5|     null| null|       null|1167|
|           1|        154|&lt;p&gt;As part ...|   11|      187| null|          4|1168|
|           0|         70|&lt;p&gt;&lt;em&g...|    3|     null| null|       null|1169|
|           2|         17|&lt;p&gt;There's ...|    8|     null| null|       null|1170|
|           1|         63|&lt;p&gt;As other...|    3|     null| null|       null|1171|
|           1|         63|&lt;p&gt;The expr...|    1|     null| null|       null|1172|
|           9|         63|&lt;p&gt;When I w

In [82]:
# filter/where
itPostFiltered = itPostDF.where("id > 2000 and id < 2010")
itPostFiltered.select("id", "body").show(n=10)

+----+--------------------+
|  id|                body|
+----+--------------------+
|2001|&lt;p&gt;Sardinia...|
|2002|&lt;p&gt;I am fro...|
|2003|&lt;p&gt;La rispo...|
|2004|&lt;p&gt;In itali...|
|2005|&lt;p&gt;Mi Ã¨ st...|
|2006|&lt;p&gt;âCa.â...|
|2007|&lt;p&gt;Un chimi...|
|2008|&lt;p&gt;&quot;sp...|
|2009|&lt;p&gt;Ad occhi...|
+----+--------------------+



In [83]:
# withColumnRenamed
itPostFiltered.withColumnRenamed("id", "selected_id")

DataFrame[commentCount: int, lastActivityDate: timestamp, ownerUserId: bigint, body: string, score: int, creationDate: timestamp, viewCount: int, title: string, tags: string, answerCount: int, acceptedAnswerId: bigint, postTypeId: bigint, selected_id: bigint]

In [84]:
#withColumn - new column
itPostColAdd = itPostFiltered.withColumn(
               "score_div_answer", 
                itPostFiltered['score']/itPostFiltered['answerCount']
                )

In [89]:
itPostColAdd.filter(itPostColAdd['score_div_answer'].isNotNull())\
            .select("score", "answerCount", "score_div_answer").show()

+-----+-----------+----------------+
|score|answerCount|score_div_answer|
+-----+-----------+----------------+
|    2|          1|             2.0|
|    2|          1|             2.0|
+-----+-----------+----------------+



In [91]:
# sort
itPostFiltered.sort("id", ascending=False).select("id", "body").show(n=10)

+----+--------------------+
|  id|                body|
+----+--------------------+
|2009|&lt;p&gt;Ad occhi...|
|2008|&lt;p&gt;&quot;sp...|
|2007|&lt;p&gt;Un chimi...|
|2006|&lt;p&gt;âCa.â...|
|2005|&lt;p&gt;Mi Ã¨ st...|
|2004|&lt;p&gt;In itali...|
|2003|&lt;p&gt;La rispo...|
|2002|&lt;p&gt;I am fro...|
|2001|&lt;p&gt;Sardinia...|
+----+--------------------+



In [92]:
# orderBy
itPostFiltered.orderBy("id", ascending=False).select("id", "body").show(n=10)

+----+--------------------+
|  id|                body|
+----+--------------------+
|2009|&lt;p&gt;Ad occhi...|
|2008|&lt;p&gt;&quot;sp...|
|2007|&lt;p&gt;Un chimi...|
|2006|&lt;p&gt;âCa.â...|
|2005|&lt;p&gt;Mi Ã¨ st...|
|2004|&lt;p&gt;In itali...|
|2003|&lt;p&gt;La rispo...|
|2002|&lt;p&gt;I am fro...|
|2001|&lt;p&gt;Sardinia...|
+----+--------------------+



### Example 5

Create a dataframe with	first 3	columns	in the file	to have: 
+ Id: long, not nullable
+ commentDate: timestamp, nullable
+ comment: string, nullable

In [139]:
data = sc.textFile('./data/Italian_Stack_Exchange/italianComments.csv')
data = data.map(lambda x: x.encode('utf8').split("~"))
data_selected = data.map(lambda x: [x[0], x[1], x[2]])

In [140]:
data.take(1)

[['18',
  '2013-11-05 20:39:30.453',
  "It's going to be really hard to answer in English...",
  '1',
  '4',
  '1']]

In [141]:
data_selected.take(1)

[['18',
  '2013-11-05 20:39:30.453',
  "It's going to be really hard to answer in English..."]]

In [142]:
def stringToPost(r):
  # r = row.encode('utf8').split("~")
  return Row(
    long(r[0]), 
    toTimeSafe(r[1]),
    r[2])

In [143]:
# Create a Schema
theSchema = StructType([
    StructField("Id", LongType(), False)
    , StructField("commentDate", TimestampType(), True)
    , StructField('comment', StringType(), True)
])

In [144]:
# Convert input data
sqlContext = SQLContext(sc)
rowRDD = data_selected.map(lambda x: stringToPost(x))
df = sqlContext.createDataFrame(rowRDD, theSchema)

In [145]:
df.show(n=3)

+---+--------------------+--------------------+
| Id|         commentDate|             comment|
+---+--------------------+--------------------+
| 18|2013-11-05 20:39:...|It's going to be ...|
|  6|2013-11-05 20:41:...|Why not &quot;IL ...|
| 18|2013-11-05 20:43:...|    Yep, added that.|
+---+--------------------+--------------------+
only showing top 3 rows



In [146]:
df.printSchema()

root
 |-- Id: long (nullable = false)
 |-- commentDate: timestamp (nullable = true)
 |-- comment: string (nullable = true)



In [157]:
from pyspark.sql.functions import array_contains
df_filtered = df.where(df['comment'].contains("@Daniele"))\
                .where(df_filtered['commentDate'].contains("2013-11-07"))

In [158]:
df_filtered.show()

+---+--------------------+--------------------+
| Id|         commentDate|             comment|
+---+--------------------+--------------------+
| 37|2013-11-07 15:30:...|@Daniele B: I kno...|
+---+--------------------+--------------------+



### Example 6
- Users	can	upvote and downvote for	marking	certain	questions useful or	not.

In [242]:
data = sc.textFile('./data/Italian_Stack_Exchange/italianVotes.csv')

In [243]:
data.take(1)

[u'2657~135~2~2013-11-22 00:00:00.0']

In [244]:
def stringToPost(row):
  r = row.encode('utf8').split("~")
  return Row(
    long(r[1]),
    long(r[0]),
    toIntSafe(r[2]),
    toTimeSafe(r[3])
  )

In [245]:
theSchema = StructType([
    StructField("id", LongType(), False)
    , StructField("postId", LongType(), False)
    , StructField("voteType", IntegerType(), False)
    , StructField("creationTime", TimestampType(), False)
])

In [246]:
rowRDD = data.map(lambda x: stringToPost(x))

In [247]:
sqlContext = SQLContext(sc)
itVoteDF = sqlContext.createDataFrame(rowRDD, theSchema)

In [248]:
itVoteDF.show(n=3)

+---+------+--------+-------------------+
| id|postId|voteType|       creationTime|
+---+------+--------+-------------------+
|135|  2657|       2|2013-11-22 00:00:00|
|142|  2658|       2|2013-11-22 00:00:00|
|142|  2659|       1|2013-11-22 00:00:00|
+---+------+--------+-------------------+
only showing top 3 rows



In [249]:
itVoteDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- postId: long (nullable = false)
 |-- voteType: integer (nullable = false)
 |-- creationTime: timestamp (nullable = false)



In [250]:
itVoteDF.where(itVoteDF["voteType"]== 1).sort("creationTime").show(n=10)

+---+------+--------+-------------------+
| id|postId|voteType|       creationTime|
+---+------+--------+-------------------+
|  2|    46|       1|2013-11-05 00:00:00|
| 12|    53|       1|2013-11-05 00:00:00|
| 36|   195|       1|2013-11-06 00:00:00|
| 30|   220|       1|2013-11-06 00:00:00|
| 55|   232|       1|2013-11-06 00:00:00|
| 45|   181|       1|2013-11-06 00:00:00|
| 52|   216|       1|2013-11-06 00:00:00|
| 77|   364|       1|2013-11-07 00:00:00|
| 83|   375|       1|2013-11-07 00:00:00|
| 84|   380|       1|2013-11-07 00:00:00|
+---+------+--------+-------------------+
only showing top 10 rows



### SQL Functions with DataFrame

- Spark SQL supports most of SQL functions

#### **Scalar Functions**: 
- Return a single value for each row based on calculations on one of the columns
- Math: abs, log, etc. 
- String: length, concat, trim, 
- Time: year, date_add, datediff, next_day

#### **Aggregate Functions**: 
- Return a single value for a group of rows
- Can be used in combination with **`groupBy()`**
- E.g. 
```python
df.groupBy().avg(<column_name>)
df.select(avg(df.column_name))
```

#### **Window Functions**: 
- Return several values for a group of rows
- Unlike aggregate	functions,	they don’t	group rows into a single	 output	per	group

![](window_func.png)

#### **User-defined Functions (UDF)**: 
- Generate custom scalar or aggregate functions
- udf_name = udf(lambda function definition)

### Example 7 - Aggregate Functions

- Add a	column	called	“duration”,	which is difference	between	 ‘lastActivityDate’	and	‘creationDate’ and	sort the DataFrame by ‘duration’ in	 descending	order.
- Calculate	an	average	score per owner	and	sort data by average scores in descending order.

In [195]:
from pyspark.sql.functions import *

In [215]:
# new column duration
itPostDF.withColumn('duration',
                   datediff('lastActivityDate','creationDate'))\
        .sort('duration', ascending=False)\
        .select('duration', 'lastActivityDate', 'creationDate')\
        .show(n=10)

+--------+--------------------+--------------------+
|duration|    lastActivityDate|        creationDate|
+--------+--------------------+--------------------+
|     303|2014-09-11 14:37:...|2013-11-12 13:34:...|
|     301|2014-09-09 08:54:...|2013-11-12 11:03:...|
|     296|2014-09-12 10:55:...|2013-11-20 16:42:...|
|     292|2014-08-31 20:19:...|2013-11-12 12:04:...|
|     291|2014-08-24 16:01:...|2013-11-06 22:12:...|
|     286|2014-09-05 21:35:...|2013-11-23 14:54:...|
|     281|2014-08-19 15:39:...|2013-11-11 18:52:...|
|     278|2014-08-12 12:47:...|2013-11-07 17:43:...|
|     278|2014-08-18 18:06:...|2013-11-13 19:04:...|
|     277|2014-08-31 20:18:...|2013-11-27 10:50:...|
+--------+--------------------+--------------------+
only showing top 10 rows



In [216]:
itPostDF.groupBy('ownerUserId').avg('score')\
        .sort('avg(score)', ascending=False)\
        .show(n=10)

+-----------+----------+
|ownerUserId|avg(score)|
+-----------+----------+
|        570|      15.0|
|          6|      15.0|
|        730|      12.0|
|        729|      11.0|
|        154|      11.0|
|        220|      10.0|
|        217|      10.0|
|        445|       9.0|
|        116|       9.0|
|        656|       9.0|
+-----------+----------+
only showing top 10 rows



### Example 8 - Window Functions

- For each	question, find the id of its owner’s previous question by	creation date.
- For each question, find the id of its ownser's previous and next question by creation date

In [219]:
from pyspark.sql.window import Window

In [229]:
itPostDF.filter(itPostDF.postTypeId == 1)\
        .select('ownerUserId', 'id', 'creationDate',
                lag(itPostDF.id, 1)\
                .over(Window.partitionBy(itPostDF.ownerUserId)\
                            .orderBy('ownerUserId', 'creationDate'))\
                .alias('prev'))\
        .orderBy('ownerUserId', 'creationDate')\
        .show(n=10)

+-----------+----+--------------------+----+
|ownerUserId|  id|        creationDate|prev|
+-----------+----+--------------------+----+
|          4|1637|2014-01-24 06:51:...|null|
|          8|   1|2013-11-05 20:22:...|null|
|          8| 112|2013-11-08 13:14:...|   1|
|          8|1192|2013-11-11 21:01:...| 112|
|          8|1276|2013-11-15 16:09:...|1192|
|          8|1321|2013-11-20 16:42:...|1276|
|          8|1365|2013-11-23 09:09:...|1321|
|         12|  11|2013-11-05 21:30:...|null|
|         12|  17|2013-11-05 22:17:...|  11|
|         12|  18|2013-11-05 22:34:...|  17|
+-----------+----+--------------------+----+
only showing top 10 rows



In [None]:
winDf = itPostDf.filter(postsDf.postTypeId == 1)\
                .select(postsDf.ownerUserId, postsDf.id, postsDf.creationDate,
                        lag(postsDf.id, 1)\
                        .over(Window.partitionBy(postsDf.ownerUserId)\
                            .orderBy(postsDf.creationDate))\
                            .alias("prev"), lead(postsDf.id, 1)\
                        .over(Window.partitionBy(postsDf.ownerUserId)\
                            .orderBy(postsDf.creationDate)).alias("next"))\
                .orderBy(postsDf.ownerUserId, postsDf.id).show()

### Example 9 - User-defined Functions

Create a UDF called `countTags` which count the number of `&lt;` and count the number of them in the tags column

In [230]:
countTags = udf(lambda x: x.count("&lt;"))

In [232]:
itPostDF.select('id', countTags('tags')).show(n=10)

+----+--------------+
|  id|<lambda>(tags)|
+----+--------------+
|1165|             0|
|1166|             1|
|1167|             0|
|1168|             3|
|1169|             0|
|1170|             0|
|1171|             0|
|1172|             0|
|1173|             2|
|1174|             0|
+----+--------------+
only showing top 10 rows



### Example 10 - User-defined Functions

Write a UDF called scoreString which discretize scores: 
- Lower than 10: Low
- Between 10 and 20: Med
- Higher than 20: High

Show the score Strings

In [233]:
def score_discrete(x):
    if x < 10: return 'low'
    elif (x >=10 and x <= 20): return 'med'
    elif x > 20: return 'high'

In [234]:
scoreString = udf(lambda x: score_discrete(x))

In [236]:
itPostDF.select('id', 'score', scoreString('score')).show(n=10)

+----+-----+---------------+
|  id|score|<lambda>(score)|
+----+-----+---------------+
|1165|   23|           high|
|1166|    1|            low|
|1167|    5|            low|
|1168|   11|            med|
|1169|    3|            low|
|1170|    8|            low|
|1171|    3|            low|
|1172|    1|            low|
|1173|    5|            low|
|1174|    5|            low|
+----+-----+---------------+
only showing top 10 rows



### Grouping Data

- Return `GroupedData` object
- Can use an aggregate function or `agg(list_of_agg_func)`

### Example 11

In [241]:
itPostDF.groupBy('ownerUserId')\
        .agg(max('score'), min('score'), avg('score'))\
        .show(n=10)

+-----------+----------+----------+------------------+
|ownerUserId|max(score)|min(score)|        avg(score)|
+-----------+----------+----------+------------------+
|        270|         1|         1|               1.0|
|        730|        12|        12|              12.0|
|        720|         1|         1|               1.0|
|         19|        10|        -1| 3.076923076923077|
|        348|         5|         5|               5.0|
|        415|         5|         1|2.1666666666666665|
|        656|         9|         9|               9.0|
|        736|         1|         1|               1.0|
|         22|        19|         0| 4.886363636363637|
|        198|         5|         5|               5.0|
+-----------+----------+----------+------------------+
only showing top 10 rows



### Joining Data

- Form: `join(df, condition, join_type)`
- Join type: `inner` (default), `outer`, `left_outer`, `right_outer`, `leftsemi`
- Example: 

### Example 13 - Joining

Join Posts DataFrame with Vote DataFrame with	itPostsDFStruct.id == itVotesDFStruct.postId.

In [252]:
itPostDF.join(itVoteDF, itPostDF.id == itVoteDF.postId, 'inner')\
        .show(n=5)

+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+----+------+--------+-------------------+
|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|title|                tags|answerCount|acceptedAnswerId|postTypeId|  id|  id|postId|voteType|       creationTime|
+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+----+------+--------+-------------------+
|           1|2013-11-06 00:22:...|         18|&lt;p&gt;&quot;De...|    3|2013-11-06 00:05:...|     null| null|                    |       null|            null|         2|  26|   2|    26|       2|2013-11-05 00:00:00|
|           0|2013-11-07 23:59:...|         56|&lt;p&gt;I know t...|   10|2013-11-06 01:16:...|      143| null|    &lt;prono

## Register DataFrame in the Table Catalog

- You can reference a DataFrame by its name by registering the DataFrame as a table
- Spark stores the table definition in the table catalog
- Save as table (2)

- Example:

```
.registerTempTable(<table_name>) # disappear with the Spark session

.write.saveAsTable(<table_name>) # register table permanently
```
- By writing, the table definitions will survive as the application restarts and are persistent

### Example 14

In [253]:
itPostDF.write.saveAsTable("Post")
itVoteDF.write.saveAsTable("Votes")

- Once the DataFrame is registered as a table, you can query its data using SQL expressions
- Use `sqlContext.sql(query)`
- Can also use spark-sql

### Example 15
Run	“select	* from posts” query.

In [256]:
sqlContext.sql("select * from post").show(n = 5)

+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|title|                tags|answerCount|acceptedAnswerId|postTypeId|  id|
+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|           4|2013-11-11 18:21:...|         17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     null| null|                    |       null|            null|         2|1165|
|           5|2013-11-10 20:31:...|         12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61| null| &lt;word-choice&gt;|          1|            null|         1|1166|
|           2|2013-11-10 20:31:...|         17|&lt;p&gt;Il verbo...|    5|2013-11-10 19:58:...|     null| null

### Loading and Saving Data using SparkSQL

#### Datatypes

- **JSON**: 
    + Spark can automatically infer a JSON schema
    + `spark.read.json(<file_name>)`
    
    
- **ORC**: Optimixed Row Columnar
    + Columnar format
    + `sqlContext.read.format('orc').load(<file_name>)`
    
    
- **Parquet**
    + Columnar file-based storage format optimized for relational access
    + Designed to be independent of any specific framework and free of unnecessary dependencies
    + `spark.read.parquet(<file_name>)`
    
    
- **Relational DB and other DB with JDBC**
    + Use JDBC drivers (Postgres and others (MySQL))
    + Spark can access several popular databases using either Hadoop connectors or custom
Spark connectors.
    + Spark can load data from any relational databases that supports Java Database
Connectivity (JDBC) including MySQL, Postgres, and other systems.
    + Download a Postgres JDBC jar from https://jdbc.postgresql.org/download.html
    + `pyspark --driver-class-path Postgres_JDBC_location`
    + Format: 
    ```python
    # where <propery_list> is a dict includes user, pass and driver
    from pyspark.sql import DataFrameReader
    df = DataFrameRead(sqlContext).jdbc(
                                    url = 'jdbc:<URL>',
                                    table = '<table_name>',
                                    properties = <property_list>
                                    ) 
    ```
    
#### Saving 
- `.write.format(<format>).saveAsTable(<name>)`

### Example 16 

Read word_bank_project.json as a DataFrame

In [10]:
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

In [7]:
# world_bank_prj = spark.read.json("s3n://usfca-msan694/world_bank_project.json")
world_bank_prj = spark.read.json("./data/world_bank_project.json")

In [8]:
world_bank_prj.show(n=3)

+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+----------------+--------------------+------------------------+--------+-----------+-------+----------+--------------------+--------------------+----------------+---------------+--------------------+--------------------+--------------------+--------------------+-----------+--------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+------+------+--------------------+--------------------+--------------------+-----------+---------+------------+--------------------+
|                 _id|approvalfy|board_approval_month|   boardapprovaldate|            borrower|         closin

### Example 17

In [4]:
postDF = sqlContext.sql("select * from post")

In [5]:
postDF.write.format('json').saveAsTable('post_json')