## Start a PySpark session

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("name").config('config','value').getOrCreate()

## Load Data
The most common method used to load data is textFile. This method takes an URI for the file (local file or other URI like hdfs://), and will read the data in as a collections of lines.

In [None]:
# load the data
raw_content = sc.textFile('2015-12-12.csv')

# print the type of the object
type(raw_content)
raw_content.dtypes

# print the number of lines
raw_content.count()
421970

## Show the first n rows

In [None]:
# show the first 5 rows
raw_content.take(5)

[u'"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"',
 u'"2015-12-12","13:42:10",257886,"3.2.2","i386","mingw32","HistData","0.7-6","CZ",1',
 u'"2015-12-12","13:24:37",1236751,"3.2.2","x86_64","mingw32","RJSONIO","1.3-0","DE",2',
 u'"2015-12-12","13:42:35",2077876,"3.2.2","i386","mingw32","UsingR","2.0-5","CZ",1',
 u'"2015-12-12","13:42:01",266724,"3.2.2","i386","mingw32","gridExtra","2.0.0","CZ",1']

# show the first row
raw_content.first()

[u'"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"']

# take random samples
raw_content.takeSample(if_replacement = True, number_of_samples = 5, seed = 3)

[u'"2015-12-12","16:41:22",18773,"3.2.3","x86_64","mingw32","evaluate","0.8","US",10935',
 u'"2015-12-12","13:06:32",494138,"3.2.3","x86_64","linux-gnu","rjson","0.2.15","KR",655',
 u'"2015-12-12","03:50:05",140207,NA,NA,NA,"SACOBRA","0.7","DE",129',
 u'"2015-12-12","21:40:13",622505,"3.2.3","x86_64","linux-gnu","stratification","2.2-5","US",4860',
 u'"2015-12-12","23:52:06",805204,"3.2.2","x86_64","mingw32","readxl","0.1.0","CA",104']

## Transformation

In [None]:
content = raw_content.map(lambda x:x.split(','))
content.take(3)

[
[u'"date"', u'"time"', u'"size"', u'"r_version"', u'"r_arch"', u'"r_os"', u'"package"', u'"version"', u'"country"', u'"ip_id"'], 
[u'"2015-12-12"', u'"13:42:10"', u'257886', u'"3.2.2"', u'"i386"', u'"mingw32"', u'"HistData"', u'"0.7-6"', u'"CZ"', u'1'], 
[u'"2015-12-12"', u'"13:24:37"', u'1236751', u'"3.2.2"', u'"x86_64"', u'"mingw32"', u'"RJSONIO"', u'"1.3-0"', u'"DE"', u'2']
]

def clean(x):
    return([xx.replace('"','') for xx in x])
content = content.map(clean)
content.take(4)

[
[u'date', u'time', u'size', u'r_version', u'r_arch', u'r_os', u'package', u'version', u'country', u'ip_id'], 
[u'2015-12-12', u'13:42:10', u'257886', u'3.2.2', u'i386', u'mingw32', u'HistData', u'0.7-6', u'CZ', u'1'], 
[u'2015-12-12', u'13:24:37', u'1236751', u'3.2.2', u'x86_64', u'mingw32', u'RJSONIO', u'1.3-0', u'DE', u'2'], 
[u'2015-12-12', u'13:42:35', u'2077876', u'3.2.2', u'i386', u'mingw32', u'UsingR', u'2.0-5', u'CZ', u'1']
]

In [None]:
# difference between map and flatmap
text = ["a b c", "d e", "f g h"]
sc.parallelize(text).map(lambda x:x.split(" ")).collect()
[['a', 'b', 'c'], ['d', 'e'], ['f', 'g', 'h']]

sc.parallelize(text).flatMap(lambda x:x.split(" ")).collect()
['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']

## reduce and counting

In [None]:
# step 1: a -> (a,1), b -> (b,1)
# step 2: (a, [1,1,1]), (b,[1,1,1,1,1,1])
# step 3: (a,3), (b,5)
package_count = content.map(lambda x:(x[6],1)).reduceByKey(lambda a,b:a+b)
type(package_count)
<class 'pyspark.rdd.PipelinedRDD'>

package_count.count()
8660

package_count.take(5)
[(u'SIS', 24), 
(u'StatMethRank', 15), 
(u'dbmss', 54), 
(u'searchable', 14), 
(u'RcmdrPlugin.TextMining', 3)]

In [None]:
# we can also use countByKey method. The result returned by it is in hashmap structure
package_count_2 = content.map(lambda x:(x[6],1)).countByKey()
type(package_count_2)
<type 'collections.defaultdict'>

package_count_2['ggplot2']
3913

package_count_2['stm']
25

In [None]:
# Please note that countByKey method ONLY works on RDDs of type (K,V), returning a hashmap of (K,int) pairs
# with the COUNT of each key [1]. And the value of V will NOT affect the result.
package_count_2 = content.map(lambda x:(x[6],1)).countByKey()
package_count_2['ggplot']
3913

package_count_2 = content.map(lambda x:(x[6],3)).countByKey()
package_count_2['ggplot']
3913

package_count_2 = content.map(lambda x:(x[6],'test')).countByKey()
package_count_2['ggplot']
3913

## Sorting

In [None]:
# After counting by reduce method, I may want to know the rankings of these packages based on how many downloads
# they have. Then we need to use sortByKey method. Note:
# the 'Key" here refers to the first element of each array
# the argument sortByKey(0 = descent, 1 = ascent)
# sort DESCENTLY and get the first 10 rows
package_count.map(lambda x:(x[1],x[0])).sortByKey(0).take(10)
[(4783, u'Rcpp'),
 (3913, u'ggplot2'),
 (3748, u'stringi'),
 (3449, u'stringr'),
 (3436, u'plyr'),
 (3265, u'magrittr'),
 (3223, u'digest'),
 (3205, u'reshape2'),
 (3046, u'RColorBrewer'),
 (3007, u'scales')]

# sort ascently and get the first 10 rows
package_count.map(lambda x:(x[1],x[0])).sortByKey(1).take(10)
 [(1, u'TSjson'),
 (1, u'ebayesthresh'),
 (1, u'parspatstat'),
 (1, u'gppois'),
 (1, u'JMLSD'),
 (1, u'kBestShortestPaths'),
 (1, u'StVAR'),
 (1, u'mosaicManip'),
 (1, u'em2'),
 (1, u'DART')]

In [None]:
# Other than sorting by key (normally it's the first element in each observation), 
# we can also specify by which element to sort using method sortBy
package_connt.sortBy(lambda x:x[1]).take(5) #default ascending is True
[(u'TSjson', 1),
 (u'ebayesthresh', 1),
 (u'parspatstat', 1),
 (u'gppois', 1),
 (u'JMLSD', 1)]

package_count.sortBy(lambda x:x[1], ascending=False).take(5)
[(u'Rcpp', 4783),
 (u'ggplot2', 3913),
 (u'stringi', 3748),
 (u'stringr', 3449),
 (u'plyr', 3436)]

## Filter

We can consider filter as the SELECT * from TABLE WHERE ??? statement in SQL. It can help return a new dataset formed by selecting those elements of the source on which the function specified by user returns true.

For example, I would want to obtain these downloading records of R package "Rtts" from China (CN), then the condition is "package == 'Rtts' AND country = 'CN'".

In [None]:
content.filter(lambda x:x[6] == 'Rtts' and x[8] == 'CN').count()
1

content.filter(lambda x:x[6] == 'Rtts' and [8] == 'CN').take(1)
[[u'2015-12-12', u'20:15:24', u'23820', u'3.2.2', u'x86_64', u'mingw32', u'Rtts', u'0.3.3', u'CN', u'41']]

# already became a list
temp = content.filter(lambda x:x[6] == 'Rtts' and [8] == 'CN').take(1)
type(temp)
<type 'list'>

## Set operation

Like the set operators in Oracle SQL, we can do set operations in Spark. Here we would introduce union, intersection, and distinct. We can make intuitive interpretations as below.

union of A and B: return elements of A AND elements of B.
intersection of A and B: return these elements existing in both A and B.
distinct of A: return the distinct values in A. That is, if element a appears more than once, it will only appear once in the result returned.

In [None]:
# One point we need to take note of is that if each line of our data is an array instead of a string, 
# intersection and distinct methods can't work properly. 
# This is why I used raw_content instead of content here as example.
raw_content.count()
421970

raw_content.union(raw_content).count()
843940

raw_content.intersection(raw_content).count()
421553

raw_content.distintct().count()
421553

## Join

Once again, I have found the data process methods in Spark is quite similar to that in SQL, like I can use join method in Spark, which is a great news! Outer joins are also supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin [1]. Additionally, cartesian is available as well (please note Spark SQL is available for similar purpose and would be preferred & recommended).

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key[1].

In [None]:
# generate a new RDD in which the 'country' variable is KEY
content_modified=content.map(lambda x:(x[8], x))

# give a mapping table of the abbreviates of four countries and their full name.
mapping=[('DE', 'Germany'), ('US', 'United States'), ('CN', 'China'), ('IN',"India")]
mapping=sc.parallelize(mapping)

# join
content_modified.join(mapping).takeSample(False, 8)
[
(u'CN', ([u'2015-12-12', u'19:26:01', u'512', u'NA', u'NA', u'NA', u'reweight', u'1.01', u'CN', u'4721'], 'China')), 
(u'US', ([u'2015-12-12', u'18:15:11', u'14271399', u'3.2.1', u'x86_64', u'mingw32', u'stringi', u'1.0-1', u'US', u'11837'], 'United States')), 
(u'US', ([u'2015-12-12', u'00:03:27', u'392370', u'3.2.3', u'x86_64', u'linux-gnu', u'colorspace', u'1.2-6', u'US', u'12607'], 'United States')), 
(u'US', ([u'2015-12-12', u'05:10:29', u'290932', u'3.2.2', u'x86_64', u'mingw32', u'iterators', u'1.0.8', u'US', u'5656'], 'United States')), 
(u'US', ([u'2015-12-12', u'22:28:47', u'2143454', u'3.2.3', u'x86_64', u'linux-gnu', u'quantreg', u'5.19', u'US', u'16318'], 'United States')), 
(u'US', ([u'2015-12-12', u'13:12:26', u'985806', u'3.2.3', u'x86_64', u'linux-gnu', u'plotly', u'2.0.3', u'US', u'2570'], 'United States')), 
(u'CN', ([u'2015-12-12', u'17:04:44', u'178399', u'3.2.1', u'x86_64', u'darwin13.4.0', u'apsrtable', u'0.8-8', u'CN', u'41'], 'China')), 
(u'US', ([u'2015-12-12', u'06:41:09', u'76007', u'3.2.3', u'i386', u'mingw32', u'superpc', u'1.09', u'US', u'1985'], 'United States'))
]

# left outer join. 
# In the mapping table, we only gave the mappings of four countries, so we found some 'None' values in the returned result below
content_modified.leftOuterJoin(mapping).takeSample(False, 8)
[
(u'US', ([u'2015-12-12', u'15:43:03', u'153892', u'3.2.2', u'i386', u'mingw32', u'gridBase', u'0.4-7', u'US', u'8922'], 'United States')), 
(u'CN', ([u'2015-12-12', u'19:59:37', u'82833', u'3.2.3', u'x86_64', u'mingw32', u'rgcvpack', u'0.1-4', u'CN', u'41'], 'China')), 
(u'JP', ([u'2015-12-12', u'17:24:59', u'2677787', u'3.2.3', u'i386', u'mingw32', u'ggplot2', u'1.0.1', u'JP', u'3597'], None)), 
(u'TN', ([u'2015-12-12', u'13:40:13', u'1229084', u'3.2.2', u'x86_64', u'mingw32', u'forecast', u'6.2', u'TN', u'10847'], None)), 
(u'US', ([u'2015-12-12', u'05:09:59', u'75327', u'3.2.3', u'x86_64', u'mingw32', u'xml2', u'0.1.2', u'US', u'5530'], 'United States')), 
(u'AE', ([u'2015-12-12', u'14:23:56', u'695625', u'3.1.2', u'i386', u'mingw32', u'mbbefd', u'0.7', u'AE', u'556'], None)), 
(u'KR', ([u'2015-12-12', u'16:31:34', u'36701', u'3.2.3', u'x86_64', u'linux-gnu', u'ttScreening', u'1.5', u'KR', u'4986'], None)), 
(u'US', ([u'2015-12-12', u'15:43:08', u'35212', u'3.2.2', u'x86_64', u'mingw32', u'reshape2', u'1.4.1', u'US', u'8922'], 'United States'))]

## Queries

In [None]:
from pyspark.sql import functions as F

In [None]:
# select
# show all entries in firstName column
df.select("firstName").show()
df.select("firstName",'lastName').show()

# show all entries in firstName, age and type
df.select("firstName",'age',explode("phoneNumber").alias("contactInfo")).select("contactInfo.type","firstName",'age').show()

# show all entries in firstName and age, add 1 to the entries of age
df.select(df['firstName'],df['age']+1).show()

# show all entries where age > 24
df.select(df['age']>24).show()

In [None]:
# when
# show firstName and 0 or 1 depending on age > 30
df.select("firstName",F.when(df.age > 30,1).otherwise(0)).show()

#show firstName if in the given options
df[df.firstName.isin('Jane','Boris')].collect()

In [None]:
# like
# show firstName, and lastName is True if lastName is like Smith
df.select("firstName",df.lastName.like('Smith')).show()
