In [None]:
global Path  
Path="file:/home/spark/spark-workshop/"

### 解壓縮測試資料

In [None]:
! unzip /home/spark/spark-workshop/data.zip -d /home/spark/spark-workshop/

# 建立

### 建立RDD

In [None]:
RawUserRDD= sc.textFile(Path+"data/u.user")

In [None]:
RawUserRDD.count()

In [None]:
RawUserRDD.take(5)

In [None]:
userRDD =RawUserRDD.map(lambda line: line.split("|"))
userRDD.take(5)

### 建立DataFrame

由RDD產生：

In [None]:
from pyspark.sql import Row 
userDF1 = userRDD.map(lambda u: Row(userid=u[0], age=int(u[1]), gender=u[2], occupation=u[3], zipcode=u[4]))
schemaUser = spark.createDataFrame(userDF1)
schemaUser.show(5)

由讀入csv檔產生：

In [None]:
userDF = spark.read.option("delimiter","|").option("inferSchema","true").csv(Path+"data/u.user")
userDF.show(5)

In [None]:
user_df = userDF.toDF(
    'userid', 'age', 'gender', 'occupation','zipcode')
user_df.show(5)
user_df.printSchema()

In [None]:
df=user_df.alias("df")
df.show(5)

### Spark SQL tempTable

In [None]:
user_df.registerTempTable("user_table")

In [None]:
sqlContext.sql(" SELECT count(*) counts FROM user_table").show()

In [None]:
sqlDF = spark.sql(" SELECT count(*) counts FROM user_table")
sqlDF.show()

In [None]:
spark.sql(" SELECT *  FROM user_table ").show()

In [None]:
spark.sql(" SELECT *  FROM user_table").show(5)

In [None]:
spark.sql(" SELECT *  FROM user_table LIMIT 5").show()

# 顯示部分欄位

### RDD

In [None]:
userRDDnew= userRDD.map(lambda x: (x[0],x[3],x[2] ,x[1]) )
userRDDnew.take(5)

### DataFrame

In [None]:
user_df.select("userid","occupation","gender","age").show(5)

In [None]:
user_df.select( user_df.userid, user_df.occupation,user_df.gender,user_df.age ).show(5)

In [None]:
user_df[user_df['userid'],user_df['occupation'],user_df['gender'],user_df['age']  ].show(5)

### Spark SQL tempTable

In [None]:
sqlContext.sql(" SELECT userid,occupation,gender,age  FROM user_table").show(5)

# 篩選資料

### RDD

In [None]:
userRDD.filter(lambda r:   r[3]=='technician' and r[2]=='M' and r[1]=='24').take(6)    

### DataFrame

In [None]:
user_df.filter("occupation='technician' ").filter("gender='M' ").filter("age=24").show()

In [None]:
user_df.filter("occupation='technician' and gender='M' and age=24").show()

In [None]:
df.filter((df.occupation=='technician' ) & (df.gender=='M' ) & (df.age==24)).show()

In [None]:
df.filter((df['occupation']=='technician' ) & (df['gender']=='M' ) & (df['age']==24)).show()

### Spark SQL 

In [None]:
sqlContext.sql(
'''SELECT *  
FROM user_table 
where occupation='technician' and   gender='M' and age=24''').show(5)

# 排序

### RDD

In [None]:
#userRDD.sortBy(lambda x: int(x[1])).collect()
userRDD.sortBy(lambda x: int(x[1]), ascending=False).collect()

### Spark SQL

In [None]:
sqlContext.sql("""
SELECT userid,occupation,gender,age   
FROM user_table 
ORDER BY age""").show(5)

In [None]:
sqlContext.sql("""
SELECT userid,occupation,gender,age   
FROM user_table 
ORDER BY age DESC""").show(5)

### DataFrame

In [None]:
user_df.select("userid","occupation","gender","age").orderBy("age").show(5)

In [None]:
df.select("userid","occupation","gender","age").orderBy("age",ascending=0 ).show(5)

In [None]:
df.select("userid","occupation","gender","age").orderBy(df.age).show(5)

In [None]:
df.select("userid","occupation","gender","age").orderBy(df.age.desc()).show(5)

# 群組統計資料

### RDD

使用map/reduce概念

In [None]:
userRDD.map(lambda x: (x[2],1)) \
               .reduceByKey(lambda x,y: x+y).collect()

In [None]:
userRDD.map(lambda x: ((x[2],x[3]),1)).reduceByKey(lambda x,y: x+y).collect()

### Spark SQL

In [None]:
sqlContext.sql(""" 
SELECT gender ,count(*)  counts  
FROM   user_table 
GROUP BY gender""").show()

In [None]:
sqlContext.sql(""" 
SELECT gender,occupation,count(*) counts  
FROM   user_table 
GROUP BY gender,occupation 
""").show(100)

### DataFrame

In [None]:
user_df.select("gender")      \
            .groupby("gender")  \
            .count().show()  

In [None]:
user_df.select("gender","occupation").                 \
                groupby("gender","occupation").           \
                count().                                                   \
                orderBy("gender","occupation").           \
                show(100)

# Join

###  DataFrame

In [None]:
zipcodeDF = spark.read.option("header","true").option("inferSchema","true").csv(Path+"data/free-zipcode-database-Primary.csv")
zipcode_df = zipcodeDF.select("Zipcode","ZipCodeType","City","State")
zipcode_df.show(5)
zipcode_df.printSchema()

In [None]:
joined_df=user_df.join(zipcode_df ,  \
    user_df.zipcode == zipcode_df.Zipcode, "left_outer")

joined_df.printSchema()    

In [None]:
joined_df.show(10)

In [None]:
joined_df.filter("state='NY' ").show(10)

In [None]:
GroupByState_df=joined_df.groupBy("state").count()
GroupByState_df.show(60)

### Spark SQL

In [None]:
zipcode_df.registerTempTable("zipcode_table")
zipcode_df.show(10)

In [None]:
sqlContext.sql(""" 
SELECT z.* 
FROM zipcode_table z 
""").show(10)

In [None]:
sqlContext.sql(""" 
SELECT u.* ,z.city,z.state 
FROM user_table u 
LEFT JOIN zipcode_table z  ON u.zipcode = z.zipcode
WHERE z.state='NY'
""").show(10)

In [None]:
sqlContext.sql(""" 
SELECT z.state ,count(*)
FROM user_table u
LEFT JOIN zipcode_table z  ON u.zipcode = z.zipcode
GROUP BY z.state 
""").show(60)

# 以Pandas DataFrame繪圖

In [None]:
import pandas as pd
GroupByState_pandas_df =GroupByState_df.toPandas().set_index('state')
GroupByState_pandas_df

In [None]:
GroupByState_pandas_df.T

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
ax = GroupByState_pandas_df ['count']  \
            .plot(kind='bar', title ="State ",figsize=(12,6),legend=True, fontsize=12)
plt.show()

In [None]:
Occupation_df=sqlContext.sql(""" 
SELECT u.occupation ,count(*) counts
FROM user_table u
GROUP BY occupation
""")
Occupation_df.show(30)

In [None]:
Occupation_pandas_df =Occupation_df.toPandas().set_index('occupation')
Occupation_pandas_df

In [None]:
ax =Occupation_pandas_df['counts'].plot(kind='pie',
           title ="occupation",figsize=(8,8),startangle=90,autopct='%1.1f%%')
ax.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
plt.show()

# Lab3: Apache Log Analysis
Ref: [Web Server Log Analysis with Apache Spark](http://datascience-enthusiast.com/Python/DataFrame_apache_log.html)

分別使用RDD、dataframe、spark sql分析下列三種情況：
1. 算出status為304共有幾筆
2. 在status為304的log裡，找出不同path的count
3. 在status為304的log裡，找出不同host的count

[Lab3-1: Spark SQL](../labs/Lab3-1-SQL.ipynb)

[Lab3-2: DataFrame](../labs/Lab3-2-DataFrame.ipynb)

[Lab3-3: RDD](../labs/Lab3-3-RDD.ipynb)