# **SparkSQL Lab: **
#### From this lab, you would write code to execute SQL query in Spark. Makes your analytic life simpler and faster.
#### ** During this lab we will cover: **
#### *Part 1:* Create a SchemaRDD (or DataFrame) 
#### *Part 2:* Loading data programmatically
#### *Part 3:* Caching for performance
#### *Part 4:* How many authors tagged as spam?
#### Reference for Spark RDD [Spark's Python API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

### ** Part 1: Create a SchemaRDD (or DataFrame) **

In [14]:
spark_home = "/opt/spark-1.4.1-bin-hadoop2.6/bin/pyspark"

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

NameError: name 'sc' is not defined

#### ** (1a) DataFrame from existing RDD **

In [None]:
#ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:396
df = sc.parallelize([Row(name="Gordon",beverage="coffee"),
                     Row(name="Katrina",beverage="tea"),
                     Row(name="Graham",beverage="juice")])

### ** Part 2: Loading data programmatically **

#### ** (2a) Read local JSON file to DataFrame **
#### Thank for the hashed spam data from PIXNET [PIXNET HACKATHON 2015](https://pixnethackathon2015.events.pixnet.net/)

In [None]:
jsonfile = "file:///opt/spark-1.4.1-bin-hadoop2.6/examples/src/main/resources/people.json"

# spark 1.3
df = sqlContext.read.json(jsonfile)
# spark 1.4
# DataFrame[age: bigint, name: string]
df = sqlContext.read.load(jsonfile, format="json")

#### ** (2b) Read data from HDFS **

In [None]:
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
peopleDf = sqlContext.createDataFrame(people)
peopleDf.registerTempTable("people")
# DataFrame[age: bigint, name: string]

In [None]:
# Load a text file and convert each line to a Row.
userlog_json = "/user/erica/userlog.txt"

userlog_info = sc.textFile(userlog_json)
userlog_pars = userlog_info.map(lambda l: l.split(","))
userlog = userlog_pars.map(lambda p: Row(author=p[0], timestamp=int(p[1]), time=p[2], action=p[3]))
userlogDF = sqlContext.createDataFrame(userlog)
userlogDF.registerTempTable("userlog")

#### ** (2c) Read Hive table**
####Even you didn't have hive metastore, it still works!

In [None]:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

#error w/o hive
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
# DataFrame[name: string, age: bigint] -> list
results = sqlContext.sql("FROM src SELECT key, value").collect()

#select *
peopleDf.select('*').collect()

#### ** (2x) User defined functions (UDF) in Spark SQL **
#### Don't forget the configuration of Hive should be done by placing your hive-site.xml file in conf/.

In [None]:
# Create an UDF for how long some text is
#hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType()) 

hiveCtx.registerFunction("strLenPython", lambda x: len(x)) 
lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython(name) FROM people")

df.registerTempTable("people")
results = sqlContext.sql("select name, strLenPython(name) as lengthName from people")

In [13]:
from test_helper import Test
# TEST Pluralize and test (1b)
Test.assertEquals(y, 1, "y is incorrect")

ImportError: No module named test_helper

#### ** (2x) Saving to persistent tables**
####  `saveAsTable ` : Saves the contents of this DataFrame to a data source as a table.

In [None]:
#error: df.saveAsTable('test_table', format='csv', mode='overwrite', path='file:///') 

#failed on csv&txt, json works well
peopleDf.save(path='/Users/etu/data/test_people.json', source='json', mode='overwrite')

#save as local parquet file 1.4
happySchemaRDD.write.save("file:///home/erica/happy.parquet", format="parquet")

#save as SQL on hive
peopleDf.saveAsTable(tableName="young", source="parquet", mode="overwrite")

In [None]:
#1.4 saved ok, load error later
people.write.save("file:///Users/etu/data/happy.parquet", format="parquet")
people.write.save("file:///Users/etu/data/happy.json", format="json")

#1.4 saved and read ok
people.write.parquet("file:///Users/etu/data/people2.parquet")
df1 = sqlContext.read.parquet("file:///Users/etu/data/people2.parquet")

#1.4 read example ok
json_ex='/Users/etu/spark-1.4.1-bin-hadoop2.6/examples/src/main/resources/people.json'
df = sqlContext.read.json(json_ex)

#1.4 saved and read ok
people.write.json("file:///Users/etu/data/people2.json")
df = sqlContext.read.json("file:///Users/etu/data/people2.json")
df2 = sqlContext.read.load("file:///Users/etu/data/people2.json", format="json")
df3 = sqlConte

In [32]:
import subprocess
print subprocess.check_output(["ls"])

Kaggle_Device.ipynb
Lucky_Draw.ipynb
ML_lab1_review_student.ipynb
PIXNET_Spam_2015.ipynb
SparkSQL_handson_2015.ipynb
UTM_user_detection.ipynb
Vagrantfile
lab1_word_count_student.ipynb
mooc-setup-master



### ** Part 3: Caching for performance**

In [None]:
#sqlContext.cacheTable("tableName")
sqlContext.cacheTable("people")

In [None]:
#removing RDD
sqlContext.uncacheTable("tableName")

### ** Part 4: How many authors tagged as spam? **

#### Use the `wordCount()` function and `takeOrdered()` to obtain top 3 most frequently author ID and their counts.

In [41]:
spam_data = '/Users/etu/Desktop/kaggle/spam/user-action-log.json'

print subprocess.check_output("cat %s | head -n 7"%(spam_data), shell=True)

[
    {
        "operate_at": 1427817600,
        "operate_date": "2015-04-01T00:00:00+08:00",
        "author": "b1e26f5cbf4d68eee850946a7d788666bffa7cbd",
        "action": "87a4425c5b350b685a97b5c7aa123e74599dd481"
    },



In [None]:
# TODO: Replace <FILL IN> with appropriate code
# 


In [6]:
from test_helper import Test
# TEST Pluralize and test (1b)
y=1
Test.assertEquals(y, 1, "y is incorrect")

1 test passed.


In [4]:
import sys
sys.path.append("/Users/etu/FunTime/spkgroup/test_helper")
print sys.path

[u'/var/folders/v4/nb8rblts6x39m04_nsgw26vh0000gp/T/spark-98195669-7354-4221-826b-5482d70cc720/userFiles-471bdc35-9ff6-4482-ab91-7028972c6ec5', '', '/Library/Python/2.7/site-packages/pexpect-3.3-py2.7.egg', '/Library/Python/2.7/site-packages/gnureadline-6.3.3-py2.7-macosx-10.9-intel.egg', '/Library/Python/2.7/site-packages/appnope-0.1.0-py2.7.egg', '/Library/Python/2.7/site-packages/traitlets-4.0.0.dev-py2.7.egg', '/Library/Python/2.7/site-packages/simplegeneric-0.8.1-py2.7.egg', '/Library/Python/2.7/site-packages/pickleshare-0.5-py2.7.egg', '/Library/Python/2.7/site-packages/decorator-3.4.2-py2.7.egg', '/Library/Python/2.7/site-packages/ipython_genutils-4.0.0.dev1-py2.7.egg', '/Library/Python/2.7/site-packages/path.py-7.3-py2.7.egg', '/Library/Python/2.7/site-packages/pip-7.1.0-py2.7.egg', '/Library/Python/2.7/site-packages/numpy-1.9.2-py2.7-macosx-10.9-intel.egg', '/Users/etu/spark-1.3.0/python/lib/py4j-0.8.2.1-src.zip', '/Users/etu/spark-1.3.0/python', '/Users/etu/myvagrant', '/Syst