### Load sample log.

Sample log is csv with three columns, (1) date, (2) userID, (3)CampaignID  - like following.
```
click.at    user.id campaign.id
2015/4/27 20:40 144012  Campaign077
2015/4/27 0:27  24485   Campaign063
2015/4/27 0:28  24485   Campaign063
2015/4/27 0:33  24485   Campaign038
```

In [1]:
import json, os, datetime, collections, commands
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *

if not os.path.exists("./click_data_sample.csv"):
    print "csv file not found at master node, will download and copy to HDFS"
    commands.getoutput("wget -q http://image.gihyo.co.jp/assets/files/book/2015/978-4-7741-7631-4/download/click_data_sample.csv")
    commands.getoutput("hadoop fs -copyFromLocal -f ./click_data_sample.csv /user/hadoop/")

whole_raw_log = sc.textFile("/user/hadoop/click_data_sample.csv")
header = whole_raw_log.first()
whole_log = whole_raw_log.filter(lambda x:x !=header).map(lambda line: line.split(","))\
            .map(lambda line: [datetime.datetime.strptime(line[0].replace('"', ''), '%Y-%m-%d %H:%M:%S'), int(line[1]), line[2].replace('"', '')])

whole_log.take(3)

[[datetime.datetime(2015, 4, 27, 20, 40, 40), 144012, u'Campaign077'],
 [datetime.datetime(2015, 4, 27, 0, 27, 55), 24485, u'Campaign063'],
 [datetime.datetime(2015, 4, 27, 0, 28, 13), 24485, u'Campaign063']]

### Create Spark Dataframe
* can create from RDD with providing schema info. [Refer this page for shema definition](http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types)
* `StructField('date', TimestampType(), True)` 's last argument (True or False) indicate "null" is allowed or not
* dataframe can be cached with same manner as RDD by `dataframe.repartition(4).cache()`

In [2]:
fields = [StructField('access_time', TimestampType(), True), StructField('userID', IntegerType(), True), StructField('campaignID', StringType(), True)]
schema = StructType(fields)

whole_log_df = sqlContext.createDataFrame(whole_log, schema)
print whole_log_df.printSchema()
print whole_log_df.dtypes
print whole_log_df.show(10)

root
 |-- access_time: timestamp (nullable = true)
 |-- userID: integer (nullable = true)
 |-- campaignID: string (nullable = true)

None
[('access_time', 'timestamp'), ('userID', 'int'), ('campaignID', 'string')]
+--------------------+------+-----------+
|         access_time|userID| campaignID|
+--------------------+------+-----------+
|2015-04-27 20:40:...|144012|Campaign077|
|2015-04-27 00:27:...| 24485|Campaign063|
|2015-04-27 00:28:...| 24485|Campaign063|
|2015-04-27 00:33:...| 24485|Campaign038|
|2015-04-27 01:00:...| 24485|Campaign063|
|2015-04-27 16:10:...|145066|Campaign103|
|2015-04-27 20:06:...|145066|Campaign103|
|2015-04-27 14:52:...|167405|Campaign027|
|2015-04-27 22:08:...|167405|Campaign027|
|2015-04-27 20:11:...| 80524|Campaign054|
+--------------------+------+-----------+
only showing top 10 rows

None


### Usage for query with SQL expression
* To use SQL expression, first need to set table name by `registerTempTable`
* You can perform sub queries, but you need to add `alias` to sub query - otherwise it fails.
  * OK : `SELECT count(*) FROM (SELECT * FROM my_table limit 10) subquery_alias`
  * fails : `SELECT count(*) FROM (SELECT * FROM my_table limit 10)`

In [None]:
#Simple SQL query

whole_log_df.registerTempTable("whole_log_table")

print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").count()
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").show(5)

#SQL query with variables
for count in range(1, 3):
    print "Campaign00" + str(count)
    print sqlContext.sql("SELECT count(*) as access_num FROM whole_log_table where campaignID == 'Campaign00" + str(count) + "'").show()

#Sub-query : noted you must add alias to subquery otherwise it fails.
print sqlContext.sql(" SELECT count(*) as first_count FROM (SELECT userID, min(access_time) as first_access_date FROM whole_log_table GROUP BY userID) subquery_alias WHERE first_access_date < '2015-04-28'").show(5)

18081
+--------------------+------+-----------+
|         access_time|userID| campaignID|
+--------------------+------+-----------+
|2015-04-27 05:26:...| 14151|Campaign047|
|2015-04-27 05:26:...| 14151|Campaign047|
|2015-04-27 05:26:...| 14151|Campaign047|
|2015-04-27 05:27:...| 14151|Campaign047|
|2015-04-27 05:28:...| 14151|Campaign047|
+--------------------+------+-----------+
only showing top 5 rows

None
Campaign001
+----------+
|access_num|
+----------+
|      2407|
+----------+

None
Campaign002
+----------+
|access_num|
+----------+
|      1674|
+----------+

None
+-----------+
|first_count|
+-----------+
|      20480|
+-----------+

None


### Usage for `fitler` and `select`
* `filter` extract rows with certain condition
* `select` extract columns

In [None]:
#Sample for filter

print whole_log_df.filter(whole_log_df["access_time"] < "2015-04-28").count()
print whole_log_df.filter(whole_log_df["access_time"] > "2015-05-01").show(5)

In [None]:
#Sample for select

print whole_log_df.select("access_time", "userID").show(5)

### Usage for `groupBy + agg`
1. example of **groupBy** which is similar of **reduceByKey**
2. example of **groupBy + agg**
  * argument of agg should be `agg({key:value})` : `key` to be colum name, `value` to be function name(like `min`,`sum`, `ave` etc) 
  * method of `agg` only works for the output of `groupBy`=(GroupedData Ojb), and not work for normal dataframe object

In [None]:
#Example(1) simple groubBy -> count(). You can set multiple keys like groupBy('key1', 'key2'). 
print whole_log_df.groupBy('campaignID').count().sort('count', ascending=False).show(5)
print whole_log_df.groupBy('campaignID', 'userID').count().sort('count', ascending=False).show(5)

In [None]:
#Example(2)
print whole_log_df.groupBy('userID').agg({"access_time": "min"}).show(3)
print whole_log_df.groupBy('userID').agg({"access_time": "min"}).printSchema()
print whole_log_df.groupBy('userID').agg({"access_time": "min"}).filter("min(access_time) < '2015-04-28'").count()

### UDF(User Defined Function)
* Spark dataframe support UDF and typical usage will add new column (or replace existing) with UDF.

In [None]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType

def add_day_column(access_time):
    return int(access_time.strftime('%Y%m%d'))
    
my_udf = UserDefinedFunction(add_day_column, IntegerType())
print whole_log_df.withColumn("access_day", my_udf("access_time")).show(10)

### Convert from dataframe to another object
* convert to RDD : `map` will convert from dataframe to RDD
* convert to (normal) list : first convert to RDD (by `map()`), then convert to list (by `collect()`)

In [None]:
#convert to rdd
print whole_log_df.groupBy('campaignID').count().map(lambda x: [x[0], x[1]]).take(5)

In [None]:
#convert to list
print whole_log_df.groupBy('campaignID').count().map(lambda x: [x[0], x[1]]).collect()[:5]
print len(whole_log_df.groupBy('campaignID').count().map(lambda x: [x[0], x[1]]).collect())

### Other

In [None]:
# columns will returns list of column strings
print whole_log_df.columns