# 15688 Tutorial Introduction to Pyspark


## Introduction
This tutorial will introduce you the basic concepts of pyspark as well as useful examples using python code. I am sure that you have heard the concept of big data. The complexity and volume of new data generate the requirement of new technology. The ability of dealing with big data is essential to each company and industry. Pyspark is one of the most common technology in this area so in this tutorial will give you a pyspark 101. The package used here is pyspark.

## Tutorial Agenda

- Apache Spark Architecture
- The Key Data Object - RDD
- Operations on RDD
- The Key Data Object - DataFrame
- Operations on DataFrame
- PySpark SQL

## Apache Spark Architecture

Spark achieves high-speed data processing thourgh distributed computation. The high-level architecture is driver-worker, a node controlling over other processes(workers). The driver organizes what computational work should been done on which worker. The computational job is distributed(map) to workers and the partial results are combined(reduce) to get final output. In this tutorial we will use pyspark, a Spark library written in Python providing easy-to-write spark programming.

Frist of all, spark creates SparkContext object, it tells the computer where to find a cluster( a driver and its worker brothers)

In [2]:
import sys
sys.path.append("/opt/packages/spark/latest/python/lib/py4j-0.10.9-src.zip")
sys.path.append("/opt/packages/spark/latest/python/")
sys.path.append("/opt/packages/spark/latest/python/pyspark")
from pyspark import SparkConf, SparkContext
sc = SparkContext()
sc


KeyboardInterrupt: 

## The Key Data Object - RDD

SparkContext can be used to create RDD(Resilient Distributed Dataset). RDD is a fundamental data structure of Spark. It is immutable, distributed and fault-tolerant. So once you create a RDD, you are not able to change it. RDD is divided into several partitions across a cluster. The number of partitions should be decided by programmer and is not changeable as well. If you forget to do this, the machine will decide it by itself according to abailable capacity. The more partitions, the better ability of parallelism. The thrid property of RDD is fault-tolerant. If one worker crushes, it simplily restart its process without effecting other workers. From other aspects, you can just imagine RDD as a list in python.

You can create a RDD by two ways. The first one is to parallelizing Python collections, such as list.( See! RDD and list are similar.) Another way is to import from files. 

In [4]:
# parallelizing
data1 = ['I', 'like', 'practical', 'data', 'science','like','lot','a','good','good']
rdd1 = sc.parallelize(data1, 4) # the number of partitions

data2 = [1,2,3,4]
rdd2 = sc.parallelize(data2) # do not decide number of partitions
print("number of partitions:"+str(rdd2.getNumPartitions()))

NameError: name 'sc' is not defined

## Opeartions on RDD

There are two opeartions on RDD. Transformations and actions. An interesting thing is that transformations are lazy. They are not computed immediately until an action runs on it. Another useful method is cache. By caching data that may be reused you keep them in memory and reduce time of disk I/O.

The basic tranformations include map(), filter(), distinct(), flatMap(). The input of these transformations is a function. Map returns a new rdd by passing each element into the function. Filter returns a new rdd with elements that return True through filter. Distinct returns distinct elements of original dataset. FlatMap is similar to map but the mapping is not restricted to one-to-one. Let's see some examples.

In [None]:
#map
mapRDD = rdd1.map(lambda x: x+'s')
print (mapRDD.collect())

In [None]:
#filter
print(rdd2.filter(lambda x: x <= 2).collect())

In [None]:
#distinct
print(rdd1.distinct().collect())

Here we can see the differences between map and flatmap.

In [None]:
#FlatMap

print(rdd2.map(lambda x: [x,x+1]).collect())
print(rdd2.flatmap(lambda x: [x,x+1]).collect())

You may be confused by the action "collect()". Remember? We mentioned before that transformations are lazy. They are not executed until an action runs on them. Here collect is an action, it just gives all the elements in a RDD. Other common actions are reduce(function), take(n), and takeOrdered(n,key = function). Reduce function takes two parameters and gives one, it is widely used to implement basic addment. Take returns the first n elements. Takeorderd is similar and it allows you to create your own sort key(ascending). You can change it into descending with a small trick.

In [None]:
#reduce
print(rdd2.reduce(lambda x,y:x+y))

In [None]:
#take
print(rdd1.take(2))

#takeOrderd
print(rdd1.takeOrderd(2,lambda x: -1*x)) # a trick to descend

In fact pyspark provides some methods that allow key-value pair transforamtions. Like dictionary in python. They are reduceByKey(),sortByKey and groupByKey(). ReduceByKey is just like reduce, only this time values are combined by their keys. SortByKey() and groupByKey() works the same way.

In [None]:
#create rdd with keys
data3 = [(1,2),(3,4),(5,6),(5,7),(3,8)]
keyRDD = sc.parallelize(data3)
#reducebykey
print(keyRDD.reduceByKey(lambda x,y : x+y).collect())
#sortbykey
print(keyRDD.sortByKey().collect())
#groupbykey
print(keyRDD.groupByKey().collect())

Now we have a command of all useful functions. Let's combine them together and do something interesting. A famous example of distributed computation is word frequency count. In the comments below I give you the code to create RDD from local files. Since I cannot submit local files in this tutorial, I will create the RDD by hand. The text is copied from our course website.

What are the steps to do word count?

First I will separate the original sentence into 

In [None]:
#file = test.txt'
#rdd = sc.textFile(file)

data4 = ['With data science, as with most disciplines, the best way to learn a particular topic is to teach it to others. The tutorial assignment of this course is meant to give you the chance to create a short tutorial on something related to data science. You can describe how to use a particular algorithm, library, methodology, or data set. Your tutorial should provide an introduction to this topic suitable for the level of other students taking this course. Ultimately, you will be graded by the course instructor and other students taking the course']
rdd4 = sc.parallelize(data4)