# Spark DataFrames & SQL - Basics

In [1]:
from __future__ import division

import pyspark
sc = pyspark.SparkContext()
sqlContext = pyspark.SQLContext(sc)

## 1. Build Spark DataFrames

### 1.1 Build Spark DataFrames from Python Lists

In [2]:
#data: 表示資料內容
#schema: 表示欄位名稱
DT1 = sqlContext.createDataFrame(data=[(1,2), (3,4)], schema=("A", "B"))

DT1.show()

+---+---+
|  A|  B|
+---+---+
|  1|  2|
|  3|  4|
+---+---+



### 1.2 Build Spark DataFrames from RDD

In [3]:
sc.textFile("/home/jovyan/dataset/2015-12-12.csv", use_unicode=False).take(2)

[b'"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"',
 b'"2015-12-12","13:42:10",257886,"3.2.2","i386","mingw32","HistData","0.7-6","CZ",1']

In [4]:
#將資料"2015-12-12","13:42:10",257886,"3.2.2","i386","mingw32","HistData","0.7-6","CZ",1 的引號拿掉
#以,分隔資料
dat = sc.textFile("/home/jovyan/dataset/2015-12-12.csv", use_unicode=True).\
                    map(lambda x:x.replace('"', "")).\
                    map(lambda x:x.split(","))

In [5]:
dat.take(2)

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id'],
 ['2015-12-12',
  '13:42:10',
  '257886',
  '3.2.2',
  'i386',
  'mingw32',
  'HistData',
  '0.7-6',
  'CZ',
  '1']]

In [6]:
DT2 = sqlContext.createDataFrame(data = dat.filter(lambda x:x[0]!='date'),
                                 schema=dat.filter(lambda x:x[0]=='date').\
                                 collect()[0])

DT2.persist()

DataFrame[date: string, time: string, size: string, r_version: string, r_arch: string, r_os: string, package: string, version: string, country: string, ip_id: string]

In [14]:
#dat.filter(lambda x:x[0]!='date').take(10)
#dat.filter(lambda x:x[0]=='date').take(10)

In [8]:
#其實內容是透過Row的方式把資料集合在一起
DT2.take(2)

[Row(date='2015-12-12', time='13:42:10', size='257886', r_version='3.2.2', r_arch='i386', r_os='mingw32', package='HistData', version='0.7-6', country='CZ', ip_id='1'),
 Row(date='2015-12-12', time='13:24:37', size='1236751', r_version='3.2.2', r_arch='x86_64', r_os='mingw32', package='RJSONIO', version='1.3-0', country='DE', ip_id='2')]

In [9]:
DT2.show(10)

+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|      date|    time|   size|r_version|r_arch|   r_os|  package|version|country|ip_id|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|2015-12-12|13:42:10| 257886|    3.2.2|  i386|mingw32| HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37|1236751|    3.2.2|x86_64|mingw32|  RJSONIO|  1.3-0|     DE|    2|
|2015-12-12|13:42:35|2077876|    3.2.2|  i386|mingw32|   UsingR|  2.0-5|     CZ|    1|
|2015-12-12|13:42:01| 266724|    3.2.2|  i386|mingw32|gridExtra|  2.0.0|     CZ|    1|
|2015-12-12|13:00:21|3687766|       NA|    NA|     NA|     lme4| 1.1-10|     DE|    3|
|2015-12-12|13:08:56|  57429|       NA|    NA|     NA| testthat| 0.11.0|     DE|    3|
|2015-12-12|13:08:09| 216068|    3.2.2|x86_64|mingw32|  mvtnorm|  1.0-3|     DE|    4|
|2015-12-12|13:25:00|3595497|    3.2.2|x86_64|mingw32|     maps|  3.0.1|     DE|    2|
|2015-12-12|13:25:05|1579597|    3.2.2|x86_

In [15]:
type(DT2)

pyspark.sql.dataframe.DataFrame

In [15]:
DT2.columns

['date',
 'time',
 'size',
 'r_version',
 'r_arch',
 'r_os',
 'package',
 'version',
 'country',
 'ip_id']

In [16]:
DT2.dtypes

[('date', 'string'),
 ('time', 'string'),
 ('size', 'string'),
 ('r_version', 'string'),
 ('r_arch', 'string'),
 ('r_os', 'string'),
 ('package', 'string'),
 ('version', 'string'),
 ('country', 'string'),
 ('ip_id', 'string')]

## 2. Change DataFrames Properties

### 2.1 Change Column Type

Availabel types include
- BinaryType
- BooleanType
- ByteType
- DoubleType
- DateType
- FloatType
- IntegerType
- etc.

In [16]:
from pyspark.sql.types import IntegerType, DateType

In [17]:
# 利用 withColumn 方法進行欄位修改
DT3 = DT2.withColumn("size", DT2["size"].cast(IntegerType()))
DT3 = DT3.withColumn("date", DT3["date"].cast(DateType()))

In [18]:
DT3.dtypes

[('date', 'date'),
 ('time', 'string'),
 ('size', 'int'),
 ('r_version', 'string'),
 ('r_arch', 'string'),
 ('r_os', 'string'),
 ('package', 'string'),
 ('version', 'string'),
 ('country', 'string'),
 ('ip_id', 'string')]

In [19]:
DT3.show(5)

+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|      date|    time|   size|r_version|r_arch|   r_os|  package|version|country|ip_id|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|2015-12-12|13:42:10| 257886|    3.2.2|  i386|mingw32| HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37|1236751|    3.2.2|x86_64|mingw32|  RJSONIO|  1.3-0|     DE|    2|
|2015-12-12|13:42:35|2077876|    3.2.2|  i386|mingw32|   UsingR|  2.0-5|     CZ|    1|
|2015-12-12|13:42:01| 266724|    3.2.2|  i386|mingw32|gridExtra|  2.0.0|     CZ|    1|
|2015-12-12|13:00:21|3687766|       NA|    NA|     NA|     lme4| 1.1-10|     DE|    3|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
only showing top 5 rows



### 2.2 Change Column Name

In [20]:
# 利用 withColumnRenamed 方法進行欄位名稱修改
DT4 = DT2.withColumnRenamed("size", "size_new")

In [21]:
DT4.show(5)

+----------+--------+--------+---------+------+-------+---------+-------+-------+-----+
|      date|    time|size_new|r_version|r_arch|   r_os|  package|version|country|ip_id|
+----------+--------+--------+---------+------+-------+---------+-------+-------+-----+
|2015-12-12|13:42:10|  257886|    3.2.2|  i386|mingw32| HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37| 1236751|    3.2.2|x86_64|mingw32|  RJSONIO|  1.3-0|     DE|    2|
|2015-12-12|13:42:35| 2077876|    3.2.2|  i386|mingw32|   UsingR|  2.0-5|     CZ|    1|
|2015-12-12|13:42:01|  266724|    3.2.2|  i386|mingw32|gridExtra|  2.0.0|     CZ|    1|
|2015-12-12|13:00:21| 3687766|       NA|    NA|     NA|     lme4| 1.1-10|     DE|    3|
+----------+--------+--------+---------+------+-------+---------+-------+-------+-----+
only showing top 5 rows



### 2.3 Order DataFrame by Specified Column(s)

In [22]:
#由小到大
DT3.sort(DT3.size.asc()).show(10)

+----------+--------+----+---------+------+----+-----------+-------+-------+-----+
|      date|    time|size|r_version|r_arch|r_os|    package|version|country|ip_id|
+----------+--------+----+---------+------+----+-----------+-------+-------+-----+
|2015-12-12|20:20:28| 504|       NA|    NA|  NA|  orloca.es|    3.2|     CN| 2365|
|2015-12-12|20:17:22| 504|       NA|    NA|  NA|  financial|    0.1|     CN| 7321|
|2015-12-12|20:20:58| 504|       NA|    NA|  NA|poistweedie|    1.0|     CN|   74|
|2015-12-12|19:06:56| 504|       NA|    NA|  NA|httpRequest|  0.0.5|     CN| 1133|
|2015-12-12|19:09:58| 504|       NA|    NA|  NA|    polycor|  0.7-0|     CN| 1153|
|2015-12-12|20:34:41| 504|       NA|    NA|  NA|     merror|    1.0|     CN| 5337|
|2015-12-12|20:38:03| 504|       NA|    NA|  NA|    sddpack|    0.9|     CN|13071|
|2015-12-12|20:36:24| 504|       NA|    NA|  NA|      pheno|    1.5|     CN| 4943|
|2015-12-12|20:48:58| 504|       NA|    NA|  NA|    divagis|  1.0.0|     CN| 4901|
|201

In [23]:
#由大到小
DT3.sort(DT3.size.desc()).show(10)

+----------+--------+--------+---------+------+------------+--------------------+-------+-------+-----+
|      date|    time|    size|r_version|r_arch|        r_os|             package|version|country|ip_id|
+----------+--------+--------+---------+------+------------+--------------------+-------+-------+-----+
|2015-12-12|02:27:31|68736865|    3.2.3|x86_64|     mingw32|           TCGA2STAT|    1.2|     US| 2700|
|2015-12-12|01:31:52|68736865|    3.1.3|x86_64|darwin10.8.0|           TCGA2STAT|    1.2|     US| 2700|
|2015-12-12|02:28:49|68736865|    3.3.0|x86_64|   linux-gnu|           TCGA2STAT|    1.2|     US| 2700|
|2015-12-12|02:28:30|68736865|    3.2.3|x86_64|     mingw32|           TCGA2STAT|    1.2|     US| 2700|
|2015-12-12|21:23:23|68736862|    3.2.3|x86_64|darwin13.4.0|           TCGA2STAT|    1.2|     US| 2700|
|2015-12-12|02:19:32|68736862|    3.2.0|  i386|     mingw32|           TCGA2STAT|    1.2|     US| 2700|
|2015-12-12|13:17:41|68736856|    3.2.3|x86_64|   linux-gnu|    

## 3. Filtering, and Aggregation

In [24]:
#針對size小於1000的資料查詢
DT3.filter(DT3['size'] <1000).count() / DT3.count()

0.11161009458040756

In [25]:
DT3.filter(DT3['package'] == "ggplot2").count() / DT3.count()

0.009273193054466087

In [27]:
#合併後的值再由大到小排序
DT3.groupBy("package").count().sort("count", ascending = False).show(10)

+------------+-----+
|     package|count|
+------------+-----+
|        Rcpp| 4783|
|     ggplot2| 3913|
|     stringi| 3748|
|     stringr| 3449|
|        plyr| 3436|
|    magrittr| 3265|
|      digest| 3223|
|    reshape2| 3205|
|RColorBrewer| 3046|
|      scales| 3007|
+------------+-----+
only showing top 10 rows



In [26]:
#同上
package_count = DT3.groupBy("package").count().sort("count", ascending = False)

In [27]:
package_count.show(10)

+------------+-----+
|     package|count|
+------------+-----+
|        Rcpp| 4783|
|     ggplot2| 3913|
|     stringi| 3748|
|     stringr| 3449|
|        plyr| 3436|
|    magrittr| 3265|
|      digest| 3223|
|    reshape2| 3205|
|RColorBrewer| 3046|
|      scales| 3007|
+------------+-----+
only showing top 10 rows



## 4. Transform A DataFrame Column (using UDF)

In [36]:
#建立一個客製化SQL函數(user define function)
#回傳四捨五入後的值，例如: round(80.23456, 2) :  80.23

from pyspark.sql.functions import udf

derive_perc = udf(lambda x: str(round(x * 100, 3)) + "%")
# or 
# @udf
# def derive_perc(x):
#     return(str(round(x * 100, 3)) + "%")

#透過withColumn調用udf函數
package_count = package_count.withColumn("perc", derive_perc(package_count['count'] / DT3.count()))

package_count.show(10)

+------------+-----+------+
|     package|count|  perc|
+------------+-----+------+
|        Rcpp| 4783|1.133%|
|     ggplot2| 3913|0.927%|
|     stringi| 3748|0.888%|
|     stringr| 3449|0.817%|
|        plyr| 3436|0.814%|
|    magrittr| 3265|0.774%|
|      digest| 3223|0.764%|
|    reshape2| 3205| 0.76%|
|RColorBrewer| 3046|0.722%|
|      scales| 3007|0.713%|
+------------+-----+------+
only showing top 10 rows



In [37]:
package_count.filter(package_count.package == 'DT').show()

+-------+-----+------+
|package|count|  perc|
+-------+-----+------+
|     DT|   97|0.023%|
+-------+-----+------+



## 5. Using Spark SQL
* 另外一種方式進行資料查詢

In [38]:
#建立一個暫時性的view針對dataframe
#這個暫時性的view的生命週期僅限於SparkSession之內
#目標是這個dataframe=>package_count

package_count.createOrReplaceTempView("package_count_sql_table")

In [39]:
type(package_count)

pyspark.sql.dataframe.DataFrame

In [40]:
package_count.show(2)

+-------+-----+------+
|package|count|  perc|
+-------+-----+------+
|   Rcpp| 4783|1.133%|
|ggplot2| 3913|0.927%|
+-------+-----+------+
only showing top 2 rows



# 第一個簡單的SQL

In [41]:
query_result = sqlContext.sql("select perc \
                               from package_count_sql_table \
                               where package = 'DT'")

print(query_result.collect())

[Row(perc='0.023%')]


# 第二個簡單的SQL

In [42]:
query_result = sqlContext.sql("select * \
                                from package_count_sql_table \
                                where count > 1000 \
                                order by count asc")
print(query_result.show(5))

+-------+-----+------+
|package|count|  perc|
+-------+-----+------+
|   slam| 1006|0.238%|
|     sp| 1020|0.242%|
|  shiny| 1041|0.247%|
|  tidyr| 1042|0.247%|
|plotrix| 1066|0.253%|
+-------+-----+------+
only showing top 5 rows

None


# 別忘記了，查詢回來的結果，也可以透過 RDD 方式運算

In [43]:
query_result.rdd.map(lambda x:x['package'] + ":" + x['perc']).take(10)

['slam:0.238%',
 'sp:0.242%',
 'shiny:0.247%',
 'tidyr:0.247%',
 'plotrix:0.253%',
 'wordcloud:0.254%',
 'rgl:0.257%',
 'markdown:0.261%',
 'irlba:0.27%',
 'pkgmaker:0.27%']

In [44]:
sc.stop()