In [1]:
sc

## Example 0
From “USF_Mission.txt”,
1. Split lines into words separated by spaces.
2. Print the five most frequent words and their occurrence ordered by occurrence.

In [None]:
file = sc.textFile('./2018-msan697-example/Data/USF_Mission.txt')

In [None]:
words = file.flatMap(lambda x:x.split())\
        .map(lambda x: (x,1))\
        .groupByKey()\
        .mapValues(lambda x:len(x))\
        .sortBy(lambda x:x[1],ascending = False)
#alternative approach
#instead of groupByKey() we can use reduceByKey(lambda x,y:x+y)

In [None]:
for stuff in words.take(5):
    print stuff[0] + " : " + str(stuff[1])

In [None]:
print words.toDebugString()

# Tuning Parallelism

**Partitions**
For parallel collections, users can specify the number of partitions to cut the RDD into.

your_rdd = sc.parallelize(data, partitionSize) or textFile(file_name, partitionSize)

sc.parallelize(data).transformation(...,partitionSize)

**Operations taking the number of partitions**
sortBy(), sortByKey(), groupBy(), reduceByKey(), join(), leftOuterJoin(), rightOuterJoin(), partitionBy(), combineByKey(), aggregateByKey(), foldByKey(), groupByKey(), cogroup(), subtractByKey(), subtract(), repartition(), coalesce()

**Checking**

your_rdd.getNumPartitions( )

**getNumPartitions( )** -Returns the number of partitions in RDD  

your_rdd.**glom( )**.collect( )

**glom( )** – return an RDD created by coalescing all elements within each partition into a list.

## Example1

Define “data” which is numbers between 1 and 9 and load them into random number of partitions (between 1 and 5).
Check the number of partitions.



In [None]:
import random 

In [None]:
for i in range(6):
    data = sc.parallelize(range(1,10), random.randint(1,7))
    print 'number of partitions: ' + str(data.getNumPartitions())
    print 'list view of the partitions: ' + str(data.glom().collect())

## Example 2

Given data = [('a',3),('b',4),('a',5)]

1. Parallelize data into 3 partitioners.

2. Perform reduceByKey(lambda x,y : x+y) and check the number of partitioner.

3. Perform reduceByKey(lambda x,y : x+y, NUM) and check the number of partitioner.


In [None]:
data = [('a',3),('b',4),('a',5)]

In [None]:
rdd = sc.parallelize(data,3)

In [None]:
print rdd.reduceByKey(lambda x,y:x+y).glom().collect()
print rdd.reduceByKey(lambda x,y:x+y).getNumPartitions()

In [None]:
print rdd.reduceByKey(lambda x,y:x+y,2).glom().collect()
print rdd.reduceByKey(lambda x,y:x+y,2).getNumPartitions()

## Example 3
Try different types of operations to change the number of partitions

In [None]:
rdd.sortBy(lambda x:x[1],numPartitions=2).glom().collect()

In [None]:
rdd.coalesce(1).glom().collect()

In [None]:
rdd.sortByKey(numPartitions=3).glom().collect()

In [None]:
rdd.groupBy(lambda x:x[0],numPartitions=2).glom().collect()

In [None]:
rdd.groupByKey(numPartitions=2).glom().collect()

In [None]:
rdd.partitionBy(numPartitions=4).glom().collect()

In [None]:
rdd.combineByKey

In [None]:
rdd.aggregateByKey

In [None]:
rdd.foldByKey

In [None]:
rdd.cogroup

In [None]:
rdd.subtractByKey

In [None]:
rdd.subtract

In [None]:
rdd.repartition(6).glom().collect()

## Tuning the level of Parallelism on Distributed Systems

Sending data back and forth between executors on parallelized distributed system causes network traffic.

    Better to place data that can minimize shuffling and improve performance.

Workload might not be evenly distributed in some partitions.
    
    May cause efficiency or memory issues.
    
**Solution**

**Partitionar**

Pair RDDs: partitionBy( )
Defines how the elements in a key-value pair RDD are partitioned by key.
Manage data commonly accessible together on the same node.
Organize data for minimizing network traffic/communication to improve
performance.
Maps each key to a partition ID, from 0 to numPartitions - 1. 
Types : HashPartitioner, RangePartitioner, Custom Partitioner.

HashPartitioner : partitioner using a hash value of a key.
partitionBy(N) uses HashPartitioner by default.
RangePartitioner : partitioner partitioning sorted RDDs into roughly equal ranges. 
CustomPartitioner : User-defined paritioner.

ex)
def custom_partitioner(key): return hash(key + 10)
pair_rdd.partitonBy(N, custom_partitioner)

## Example 4

Join “filtered_registered_business_sf.csv” and “supervisor_sf.csv” efficiently.

In [None]:
lines1 = sc.textFile("../Distributed_computing_2/2018-msan697-example/Data/filtered_registered_business_sf.csv")
business = lines1.map(lambda x:(x.split(",")[0],x))
lines2 = sc.textFile("../Distributed_computing_2/2018-msan697-example/Data/supervisor_sf.csv")
supervisor = lines2.map(lambda x:x.split(','))

In [None]:
partitionsize =5

In [None]:
for i in range(1,9):
    %timeit business.join(supervisor,numPartitions=i).persist().collect()

In [None]:
import time

In [None]:
start = time.time()
business.join(supervisor).collect()
print time.time() - start

In [None]:
partition_size = 6
business = business.partitionBy(partition_size).persist()
supervisor = supervisor.partitionBy(partition_size).persist()

In [None]:
supervisor.take(1)
business.take(1)

In [None]:
start = time.time()
business.join(supervisor).collect()
print time.time() - start

In [None]:
business.join(supervisor).glom().collect()

**Follow up question?**

Does Reducing shuffles benefits map(): False


** Difference between coalesce and repartition **

example 0 represents no data

first partition 0 b 0 b

second partition c 0 c 0 c

repartition to 3 partitions

c c c 
0 b 0
b 0 0 

coalsce ( minimized data movement)

0 b 0
c 0 c
0 c b




## Example 5
Create a custom practitioner using
1. hash value of the key.
2. hash value of the key + 10.

In [None]:
nums = sc.parallelize([(1,2),(3,4),(5,1),(2,3),(1,5)])
print nums.partitionBy(4).glom().collect()
print nums.coalesce(4).glom().collect()

In [None]:
def hash_partioner(key):
    return hash(key)
def custom_partitioner(key):
    return hash(key % 3)
print "hash: " + str(nums.partitionBy(4).glom().collect())
print "hash: " + str(nums.partitionBy(4,hash_partioner).glom().collect())
print "custom: " + str(nums.partitionBy(4, custom_partitioner).glom().collect())

## Tuning the Level of Parallelism on Distributed Systems
**Operations and Partitioner**

Operations benefiting from partitioning

Operations involving shuffling data by key across the network.

Ex. join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookUp(), etc.

Operations returns RDDs with known partitioning information. 

sortByKey() : range-partition

groupByKey() : hash-partition

Operations forget the parent’s partitioning information.

map() : because it can theoretically modify the key of each record.

**Repartitioning RDDs.**

Change the partitioning to distribute the workload more efficiently or avoid memory problems.

repartition(numPartitions: Int)

Shuffle data across the network to create a new set of partitions.
èExpensive.

coalesce(numPartitions: Int, shuffle = false)

Optimized version of repartition() – avoid data movement and reduce the number of RDD partitions.

Match the locality as much as possible, but try to balance partitions across the machines.


**Follow up question?**

Does Reducing shuffles benefits map(): False


** Difference between coalesce and repartition **

example 0 represents no data

first partition 0 b 0 b

second partition c 0 c 0 c

repartition to 3 partitions

c c c 
0 b 0
b 0 0 

coalsce ( minimized data movement)

0 b 0
c 0 c
0 c b


Coalesce will not increase the number of partitions.
if you put shuffle = true.  coalesce becomes repartition. 


## Example 6

Compare .coalesce() and .repartition().
Which one shuffles data less? .coalesce()

Can the number of partitions smaller than its parent’s number of partitions? yes 

In [None]:
nums = sc.parallelize([1,1,3,3,11,13,14],6)
nums.glom().collect()

In [None]:
%time print nums.repartition(3).glom().collect()
%time print nums.coalesce(3).glom().collect()

In [None]:
print nums.glom().collect()
print nums.repartition(10).glom().collect()
print nums.coalesce(10).glom().collect()
print nums.coalesce(10,True).glom().collect()

## Example 7 - PageRank
Write a page rank algorithms where
data = [(1,[2,3,4]), (2,[1,3]), (3,[4])] where the format is (URL, [LIST OF URLS]).


In [None]:
data = [(1,[2,3,4]),(2,[1,3]),(3,[4])]

In [None]:
links = sc.parallelize(data).partitionBy(2).cache()

In [None]:
ranks = links.map(lambda x: (x[0], 1.0)) #init each rank to 1

In [None]:
links.glom().collect()

In [None]:
#compute each url's contribution
def computeContribs(urls, rank):
    for url in urls:
        yield (url, rank / len(urls))

In [None]:
it_num = 10 #in practice it runs about 10 iterations
for i in range (it_num):
    contributions = links.join(ranks).flatMap(lambda x : computeContribs(x[1][0],x[1][1]))
    ranks = contributions.reduceByKey(lambda x,y : x+y).mapValues(lambda x : x*0.85+0.15)
ranks.collect()


In [None]:
#Basically repeating this block 10+ times.
contributions = links.join(ranks).flatMap(lambda x : computeContribs(x[1][0],x[1][1]))
contributions.collect()
ranks = contributions.reduceByKey(lambda x,y : x+y).mapValues(lambda x : x*0.85+0.15)
ranks.collect()

## Partition-specific Methods

Operations specifically designed to interact with partitions as atomic units.

**foreachPartition()** : Apply a function to each partition of an RDD.

**glom()** : Return all elements within each partition.

**lookup(key)**: Return values for the key using the partitioner to narrow its search to only the partitions where the key would present.

**mapPartitions()**: Return a new RDD by applying a function to each partition of the RDD.

# RDD Dependencies

## RDD Dependencies (RDD Lineage)

A dependency between an old and a new RDD is created, every time a transformation is performed on an RDD.

Spark’s execution model is based on directed acyclic graphs (DAGs), where nodes are RDDs and edges are dependencies.  

RDD ---dependency--> RDD 

The new RDD depend on the old RDD.

**RDD Resilience** - As Spark records the linage of each RDD, any RDDs can be reconstructed to the state it was at the time of the failure due to its **Lineage**. 

### RDD Dependency Types
**Narrow** – When no data shuffle between partitions is required.

**Wide** - When it requires shuffle when joining RDDs.

## Example 8

What are the types of dependencies in the following code?

In [None]:
import random

In [None]:
list = [random.randrange(10) for x in range(500)] #(narrow) create 500 rndm in [0,9]

In [None]:
listrdd = sc.parallelize(list,5) #(narrow) parallelize list with 5 partitions

In [None]:
pairs = listrdd.map(lambda x:(x,x*x))   #(narrow) create pairs of num, num^2 

In [None]:
reduced = pairs.reduceByKey(lambda v1, v2:v1+v2) #(wide) summing up squares (shuffle) 

In [None]:
finalrdd = reduced.mapPartitions(lambda itr: ["K="+str(k)+",V="+str(v) for (k,v) in itr])
#narrow 

In [None]:
for stuff in finalrdd.collect():
    print stuff

**toDebugString()**

Shows a textual representation of RDD dependencies.

The RDDs in the output appears in reverse order.

Useful in trying to minimize the number of shuffles.

The numbers in parentheses show the number of partitions of the corresponding RDD.

Every time you see a ShuffleRDD in the lineage chain, you can be sure that a shuffle will be performed at that point.

In [None]:
print finalrdd.toDebugString()

**Spark stage and tasks**

Every job is divided into stages based on the points where shuffles occur.

For each stage, tasks are created and sent to the executors.

After all tasks of a particular stage complete, the driver creates tasks for the next stage and sends them to the executors.

**Spark stages and tasks** in Example 8.

Stage 1 encompasses transformations that result in a shuffle: parallelize , map , and reduceByKey .

The results of Stage 1 are saved on disk as intermediate files on executor machines.

Stage 2, each partition receives data from these intermediate files belonging to it, and the execution is continued.

**checkpoint()**
Persist RDDs to disk.

After checkpointing, the RDD ’s dependencies are erased, as well as the information about its parent(s), because they won’t be needed for its recomputation any more.

cf. persist(), cache() : Keeps RDD’s dependencies.

Example.
sc.setCheckpointDir("dir") # sets the directory where RDDs will be checkpointed.

RDD.checkpoint() # will be triggered once an action is called.

RDD.action() # After checkpointing, the RDD linage including its parents’ information will be removed.

In [None]:
ordered_finalRDD = pairs.reduceByKey(lambda x1,x2 : x1+x2).sortByKey()

In [None]:
ordered_finalRDD.collect()

In [None]:
print ordered_finalRDD.toDebugString()

In [None]:
repartition = ordered_finalRDD.repartition(3)

In [None]:
print repartition.toDebugString()

sc.setCheckpointDir("checkpoint")
repartition.checkpoint() #all references to the parent will be removed
print repartition.isCheckpointed() #not checkpointed until action

repartition.count()
print repartition.isCheckpointed()
print repartition.getCheckpointFile()

In [None]:
print repartition.toDebugString()

# Using Shared Variables
Using shared variables to communicate with Spark executors.

Normally, when a function passed to a Spark operation is executed, it works on separate copies of all the variables used in the function. These variables are copied to each partition, and updates to the variables are not propagated back to the driver program.

Solution : Shared variables
Help maintain a global state or share data across tasks and partitions.

Accumulator : Aggregate information from executor nodes to the driver node.

Broadcast variable : Efficiently distribute large read-only values to executor nodes.

**Accumulator**

Is shared across executors that you can only add to.

Useful to implement global sums and counters.

Can be accessed by driver node, not from an executor node.

Create using sc.accumulator(inital_value)

The executor can add to the accumulator with .add(val) method or +. (it is write-only.)

The driver can call it using .value

If accumulators are used in transformation, the results may be erroneous.

Can cause accumulator values to be counted more than once depending on what happens on the cluster.

Should use accumulators within actions such as foreach().

## Example 9
Define a accumulator variable and initialize the value to be 0.
Generate values between 1 and 10,000,000 using .parallelize().
For each value, increment accumulator variable.

In [None]:
accum = sc.accumulator(0)

In [None]:
list = sc.parallelize(range(1,11))

In [None]:
list.glom().collect()

In [None]:
list.foreachPartition(lambda x: accum.add(1))

In [None]:
list.collect()

In [None]:
hello = accum.value

In [None]:
hello

In [None]:
list.foreach(lambda x:accum.add(1))

In [None]:
hello

In [None]:
accum.value

In [None]:
accum.value

In [None]:
accum.add(-2)

In [None]:
accum.value

**Shared variables**
**Broadcast variable** : Read-only object set by the driver.

Efficiently distribute large read-only values such as lookup tables to
executor nodes.

The value is sent to each node only once (not per task). ◦ Create a broadcast variable using sc.broadcast(value).
Access the value with .value.
Should .unpersist() for removing a broadcast variable from memory on all workers.

**Advantages**
Use an efficient and scalable peer-to-peer distribution mechanism. ◦ Eliminate the need for a shuffle operation.

Replicate data once per worker (not once per task). ◦ Are serialized objects (can be read efficiently.).

## Example 11
Generate a broadcast variable and access, modify and unpersist it.

In [None]:
first_broadcast = sc.broadcast([1,2,3])

In [None]:
first_broadcast.value

In [None]:
x = first_broadcast

In [None]:
type(x)

In [None]:
list1 = sc.parallelize(range(1,10))

In [None]:
list1.glom().collect()

In [None]:
list1.toDebugString()

In [None]:
add_broadcastVar = list1.map(lambda x: x + first_broadcast.value[0])

In [None]:
add_broadcastVar.glom().collect()

In [None]:
first_broadcast = first_broadcast + 1

In [None]:
first_broadcast.unpersist()

In [None]:
first_broadcast.value

time test partition

In [None]:
test = sc.parallelize(range(1,10000))

In [None]:
test.getNumPartitions()

In [None]:
test1 = test.coalesce(1)

In [None]:
test1.getNumPartitions()

# Day 2

## Loading and Saving Data
Filesystems

Local Filesystem

Specify sc.textFile(file:///path)

The filesystem should be available at the same path on all nodes in the cluster for both the master and executors. – If the file is not on all nodes in the cluster, load it locally and then call parallelize to distribute the contents to workers.

Filesystems
Amazon S3 (like dropbox)
S3 (Simple Storage Service) – Web Storage Service
Place data on S3.
Log in to the console on https://aws.amazon.com/.
Choose S3 Service.
Create a bucket and upload ”README.md”.

In [None]:
rdd = sc.textFile("s3n://myfirstbucket022881/README.md")

In [None]:
rdd.collect()

## Spark SQL

**DataFrame**

Handle structured, distributed data with a table-like representation with named column declared with column types.

cf. RDD : low-level and direct way of manipulating data in Spark.

Special case of the DataSet type.

You can join, query and save DataFrames.

Spark SQL lets you register DataFrames from different sources (SQL, Parquet, JSON, ORC) as tables in the table catalog and query them.


Data has to bave some structure before going into Spark SQL

RDD.toDF() goes to dataframe

dataframes are optimized for queries, joins and features important to structured data. 

DataFrame.rdd goes to RDD

**Creating DataFrames**

Convert existing RDDs.

In order to create DataFrames, you need to load data as text, parse the lines, and identify elements.

You can create DataFrames from RDD.

1) Using RDDs containing row data as tuples.  Limited as it doesn’t allow to specify all the schema attributes. 

2) Specifying a schema using createDataFrame.



## Example 2
Create DataFrame, using RDDs containing row data as tuples. Step 1. Load the italianPosts.csv parsed with ”~” into an RDD.

In [None]:
# Create DataFrame, using RDDs

In [2]:
itPostsRows = sc.textFile("./2018-msan697-example/Data/Italian_Stack_Exchange/italianPosts.csv")

In [3]:
itPostsSplit = itPostsRows.map(lambda x: x.split("~"))

In [4]:
#Step 2-2. Create DataFrame, using RDDs containing row data as tuples

In [5]:
itPostsRDD = itPostsSplit.map(lambda x:(x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12]))

In [6]:
itPostsRDD.take(1)

[(u'4',
  u'2013-11-11 18:21:10.903',
  u'17',
  u"&lt;p&gt;The infinitive tense is commonly used for expressing rules especially in signs (of any kind, not just road signs).&lt;/p&gt;&lt;p&gt;For instance&lt;/p&gt;&lt;blockquote&gt;  &lt;p&gt;Non fumare&lt;br&gt;  Non calpestare il prato&lt;br&gt;  Tenere la destra&lt;/p&gt;&lt;/blockquote&gt;&lt;p&gt;The language &quot;trick&quot; behind this use of the infinitive form is the omission of the clause &lt;em&gt;Si prega di&lt;/em&gt; or equivalent, so the above sentences are read as&lt;/p&gt;&lt;blockquote&gt;  &lt;p&gt;&lt;em&gt;&lt;strong&gt;Si prega di&lt;/em&gt;&lt;/strong&gt; non fumare&lt;br&gt;  &lt;strong&gt;&lt;em&gt;Si prega di&lt;/em&gt;&lt;/strong&gt; non calpestare il prato&lt;br&gt;  &lt;strong&gt;&lt;em&gt;Si prega di&lt;/em&gt;&lt;/strong&gt; tenere la destra&lt;/p&gt;&lt;/blockquote&gt;&lt;p&gt;Such form is not used in everyday's spoken language, as it's a convention used for giving orders and stating rules in an impers

In [7]:
itPostsDFrame = itPostsRDD.toDF()

In [8]:
print itPostsDFrame.show()

+---+--------------------+---+--------------------+---+--------------------+----+--------------------+--------------------+----+----+---+----+
| _1|                  _2| _3|                  _4| _5|                  _6|  _7|                  _8|                  _9| _10| _11|_12| _13|
+---+--------------------+---+--------------------+---+--------------------+----+--------------------+--------------------+----+----+---+----+
|  4|2013-11-11 18:21:...| 17|&lt;p&gt;The infi...| 23|2013-11-10 19:37:...|null|                    |                    |null|null|  2|1165|
|  5|2013-11-10 20:31:...| 12|&lt;p&gt;Come cre...|  1|2013-11-10 19:44:...|  61|Cosa sapreste dir...| &lt;word-choice&gt;|   1|null|  1|1166|
|  2|2013-11-10 20:31:...| 17|&lt;p&gt;Il verbo...|  5|2013-11-10 19:58:...|null|                    |                    |null|null|  2|1167|
|  1|2014-07-25 13:15:...|154|&lt;p&gt;As part ...| 11|2013-11-10 22:03:...| 187|Ironic constructi...|&lt;english-compa...|   4|1170|  1|1168|

In [9]:
itPostsDFrame.select('_1', '_2').show()

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  4|2013-11-11 18:21:...|
|  5|2013-11-10 20:31:...|
|  2|2013-11-10 20:31:...|
|  1|2014-07-25 13:15:...|
|  0|2013-11-10 22:15:...|
|  2|2013-11-10 22:17:...|
|  1|2013-11-11 09:51:...|
|  1|2013-11-12 23:57:...|
|  9|2014-01-05 11:13:...|
|  0|2013-11-11 10:58:...|
|  1|2014-01-16 19:56:...|
|  0|2013-11-11 14:36:...|
|  3|2014-01-16 19:56:...|
|  0|2013-11-11 12:00:...|
|  0|2013-11-12 11:24:...|
|  4|2013-11-11 19:54:...|
|  0|2013-11-11 18:20:...|
|  1|2013-11-11 14:36:...|
|  2|2013-11-14 09:56:...|
|  2|2013-11-11 23:23:...|
+---+--------------------+
only showing top 20 rows



In [10]:
itPostsDFrame.columns

['_1',
 '_2',
 '_3',
 '_4',
 '_5',
 '_6',
 '_7',
 '_8',
 '_9',
 '_10',
 '_11',
 '_12',
 '_13']

In [11]:
#to name columns

itPostsDF = itPostsRDD.toDF(["commentCount", "lastActivityDate", "ownerUserId", "body", "score", "creationDate", "viewCount", "title", "tags", "answerCount", "acceptedAnswerId", "postTypeId", "id"])

itPostsDF.show()

+------------+--------------------+-----------+--------------------+-----+--------------------+---------+--------------------+--------------------+-----------+----------------+----------+----+
|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|               title|                tags|answerCount|acceptedAnswerId|postTypeId|  id|
+------------+--------------------+-----------+--------------------+-----+--------------------+---------+--------------------+--------------------+-----------+----------------+----------+----+
|           4|2013-11-11 18:21:...|         17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     null|                    |                    |       null|            null|         2|1165|
|           5|2013-11-10 20:31:...|         12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61|Cosa sapreste dir...| &lt;word-choice&gt;|          1|            null|         1|1166|
|           2|2013-11-10 20:31:...|

In [12]:
#to select specific columns
itPostsDF.select('commentCount').show()

+------------+
|commentCount|
+------------+
|           4|
|           5|
|           2|
|           1|
|           0|
|           2|
|           1|
|           1|
|           9|
|           0|
|           1|
|           0|
|           3|
|           0|
|           0|
|           4|
|           0|
|           1|
|           2|
|           2|
+------------+
only showing top 20 rows



In [13]:
itPostsDF.printSchema()

root
 |-- commentCount: string (nullable = true)
 |-- lastActivityDate: string (nullable = true)
 |-- ownerUserId: string (nullable = true)
 |-- body: string (nullable = true)
 |-- score: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- viewCount: string (nullable = true)
 |-- title: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- answerCount: string (nullable = true)
 |-- acceptedAnswerId: string (nullable = true)
 |-- postTypeId: string (nullable = true)
 |-- id: string (nullable = true)



Convert existing RDDs.

2) Specifying a schema using createDataFrame.

Use createDataFrame(data, schema=None, samplingRatio=None , verifySchema=True) 
Creates a DataFrame from an RDD.
Parameters

**data**–anRDDofRow/tuple/list/dict,list,orpandas.DataFrame.
**schema**–aStructTypeorlistofcolumnnames.defaultNone.
When schema is a list of column names,the type of each column will be inferred from data. 
**samplingRatio** –thesampleratioofrowsusedforinferringtheschema.
**verifySchema** –verify data types of every row against schema.

1.
Convert existing RDDs.
2) Specifying a schema using createDataFrame.
Use createDataFrame(data, schema=None, samplingRatio=None , verifySchema=True) 

Parameters

schema – a StructType or list of column names. default None.
StructType : consisting of a list of StructField.

StructField : including column name(string), data type, nullable(default : True), metadata( default: None)

data type – NullType(), StringType(), BinaryType(), BooleanType(), DateType(),TimstampType(), DoubleType(), FloatType(), IntegerType(), etc.

Ex.
    from pyspark.sql.types import *
    schema = StructType([StructField("name", StringType(), True),
                StructField("age", IntegerType(), True)])

## Example 3
Create DataFrame, specifying a schema using createDataFrame.

Step 1. Load the italianPosts.csv parsed with ”~” into an RDD.

In [14]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc) 

In [15]:
itPostsRows = sc.textFile("./2018-msan697-example/Data/Italian_Stack_Exchange/italianPosts.csv")

In [16]:
sqlContext = SQLContext(sc)
from pyspark.sql import Row
from datetime import datetime

def toIntSafe(inval):
  try:
    return int(inval)
  except ValueError:
    return None

def toTimeSafe(inval):
  try:
    return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S.%f")
  except ValueError:
    return None

def toLongSafe(inval):
  try:
    return long(inval)
  except ValueError:
    return None
    
def stringToPost(row):
  r = row.encode('utf8').split("~")
  return Row(
    toIntSafe(r[0]),
    toTimeSafe(r[1]),
    toIntSafe(r[2]),
    r[3],
    toIntSafe(r[4]),
    toTimeSafe(r[5]),
    toIntSafe(r[6]),
    toIntSafe(r[7]),
    r[8],
    toIntSafe(r[9]),
    toLongSafe(r[10]),
    toLongSafe(r[11]),
    long(r[12]))

In [17]:
from pyspark.sql.types import *
postSchema = StructType([
  StructField("commentCount", IntegerType(),True),
  StructField("lastActivityDate", TimestampType(), True),
  StructField("ownerUserId", LongType(), True),
  StructField("body", StringType(), True),
  StructField("score", IntegerType(), True),
  StructField("creationDate", TimestampType(), True),
  StructField("viewCount", IntegerType(), True),
  StructField("title", StringType(), True),
  StructField("tags", StringType(), True),
  StructField("answerCount", IntegerType(), True),
  StructField("acceptedAnswerId", LongType(), True),
  StructField("postTypeId", LongType(), True),
  StructField("id", LongType(), False)
  ])

In [18]:
rowRDD = itPostsRows.map(lambda x:stringToPost(x))

In [19]:
itPostsDFStruct = sqlContext.createDataFrame(rowRDD, postSchema)

In [20]:
itPostsDFStruct.printSchema()

root
 |-- commentCount: integer (nullable = true)
 |-- lastActivityDate: timestamp (nullable = true)
 |-- ownerUserId: long (nullable = true)
 |-- body: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- creationDate: timestamp (nullable = true)
 |-- viewCount: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- answerCount: integer (nullable = true)
 |-- acceptedAnswerId: long (nullable = true)
 |-- postTypeId: long (nullable = true)
 |-- id: long (nullable = false)



In [21]:
itPostsDFStruct.select('postTypeId').show()

+----------+
|postTypeId|
+----------+
|         2|
|         1|
|         2|
|         1|
|         2|
|         2|
|         2|
|         2|
|         1|
|         2|
|         1|
|         1|
|         2|
|         2|
|         1|
|         2|
|         2|
|         2|
|         1|
|         1|
+----------+
only showing top 20 rows



## DataFram API Basics
Basic APIs (Example 6)

select()

drop()

filter(),where()

withColumnRenamed(), witColumn() (Renaming and adding columns)

orderBy(), sort()

In [22]:
itPostsDFStruct.columns

['commentCount',
 'lastActivityDate',
 'ownerUserId',
 'body',
 'score',
 'creationDate',
 'viewCount',
 'title',
 'tags',
 'answerCount',
 'acceptedAnswerId',
 'postTypeId',
 'id']

In [23]:
itPostsDFStruct.select('lastActivityDate','body').show()

+--------------------+--------------------+
|    lastActivityDate|                body|
+--------------------+--------------------+
|2013-11-11 18:21:...|&lt;p&gt;The infi...|
|2013-11-10 20:31:...|&lt;p&gt;Come cre...|
|2013-11-10 20:31:...|&lt;p&gt;Il verbo...|
|2014-07-25 13:15:...|&lt;p&gt;As part ...|
|2013-11-10 22:15:...|&lt;p&gt;&lt;em&g...|
|2013-11-10 22:17:...|&lt;p&gt;There's ...|
|2013-11-11 09:51:...|&lt;p&gt;As other...|
|2013-11-12 23:57:...|&lt;p&gt;The expr...|
|2014-01-05 11:13:...|&lt;p&gt;When I w...|
|2013-11-11 10:58:...|&lt;p&gt;Wow, wha...|
|2014-01-16 19:56:...|&lt;p&gt;Suppose ...|
|2013-11-11 14:36:...|&lt;p&gt;Except w...|
|2014-01-16 19:56:...|&lt;p&gt;Both you...|
|2013-11-11 12:00:...|&lt;blockquote&gt...|
|2013-11-12 11:24:...|&lt;p&gt;Comparin...|
|2013-11-11 19:54:...|&lt;p&gt;Using th...|
|2013-11-11 18:20:...|&lt;p&gt;I would ...|
|2013-11-11 14:36:...|&lt;p&gt;Putting ...|
|2013-11-14 09:56:...|&lt;p&gt;Many peo...|
|2013-11-11 23:23:...|&lt;p&gt;S

In [24]:
itPostsDFStruct.select(itPostsDFStruct['body']).show()

+--------------------+
|                body|
+--------------------+
|&lt;p&gt;The infi...|
|&lt;p&gt;Come cre...|
|&lt;p&gt;Il verbo...|
|&lt;p&gt;As part ...|
|&lt;p&gt;&lt;em&g...|
|&lt;p&gt;There's ...|
|&lt;p&gt;As other...|
|&lt;p&gt;The expr...|
|&lt;p&gt;When I w...|
|&lt;p&gt;Wow, wha...|
|&lt;p&gt;Suppose ...|
|&lt;p&gt;Except w...|
|&lt;p&gt;Both you...|
|&lt;blockquote&gt...|
|&lt;p&gt;Comparin...|
|&lt;p&gt;Using th...|
|&lt;p&gt;I would ...|
|&lt;p&gt;Putting ...|
|&lt;p&gt;Many peo...|
|&lt;p&gt;Sono un'...|
+--------------------+
only showing top 20 rows



**Basic APIs**

drop()

Select all except one column.



In [25]:
itPostsDFStruct.drop('body').show()

+------------+--------------------+-----------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|commentCount|    lastActivityDate|ownerUserId|score|        creationDate|viewCount|title|                tags|answerCount|acceptedAnswerId|postTypeId|  id|
+------------+--------------------+-----------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|           4|2013-11-11 18:21:...|         17|   23|2013-11-10 19:37:...|     null| null|                    |       null|            null|         2|1165|
|           5|2013-11-10 20:31:...|         12|    1|2013-11-10 19:44:...|       61| null| &lt;word-choice&gt;|          1|            null|         1|1166|
|           2|2013-11-10 20:31:...|         17|    5|2013-11-10 19:58:...|     null| null|                    |       null|            null|         2|1167|
|           1|2014-07-25 13:15:...|        154|   11|2013-

In [26]:
itPostsDFFilterId = itPostsDFStruct.where("id > 2000 and id < 2010")

In [27]:
itPostsDFFilterId.select('id','body').show()

+----+--------------------+
|  id|                body|
+----+--------------------+
|2001|&lt;p&gt;Sardinia...|
|2002|&lt;p&gt;I am fro...|
|2003|&lt;p&gt;La rispo...|
|2004|&lt;p&gt;In itali...|
|2005|&lt;p&gt;Mi Ã¨ st...|
|2006|&lt;p&gt;âCa.â...|
|2007|&lt;p&gt;Un chimi...|
|2008|&lt;p&gt;&quot;sp...|
|2009|&lt;p&gt;Ad occhi...|
+----+--------------------+



In [28]:
#renaming a column
itPostsDFFilterId.withColumnRenamed('id','selected_id')

DataFrame[commentCount: int, lastActivityDate: timestamp, ownerUserId: bigint, body: string, score: int, creationDate: timestamp, viewCount: int, title: string, tags: string, answerCount: int, acceptedAnswerId: bigint, postTypeId: bigint, selected_id: bigint]

In [29]:
itPostsDFFilterId.columns

['commentCount',
 'lastActivityDate',
 'ownerUserId',
 'body',
 'score',
 'creationDate',
 'viewCount',
 'title',
 'tags',
 'answerCount',
 'acceptedAnswerId',
 'postTypeId',
 'id']

In [30]:
itPostsDFFWIthRatio = itPostsDFFilterId.withColumn('score_div_answer'\
 ,itPostsDFFilterId['score']/itPostsDFFilterId['answerCount'])
                                                   

In [31]:
itPostsDFFWIthRatio.columns

['commentCount',
 'lastActivityDate',
 'ownerUserId',
 'body',
 'score',
 'creationDate',
 'viewCount',
 'title',
 'tags',
 'answerCount',
 'acceptedAnswerId',
 'postTypeId',
 'id',
 'score_div_answer']

In [32]:
itPostsDFFWIthRatio.select('score_div_answer').show()

+----------------+
|score_div_answer|
+----------------+
|            null|
|            null|
|            null|
|            null|
|             2.0|
|            null|
|             2.0|
|            null|
|            null|
+----------------+



In [33]:
itPostsDFFWIthRatio.where(itPostsDFFWIthRatio['score_div_answer'].isNotNull()).show()

+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+----------------+
|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|title|                tags|answerCount|acceptedAnswerId|postTypeId|  id|score_div_answer|
+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+----------------+
|           8|2014-06-14 07:37:...|        223|&lt;p&gt;Mi Ã¨ st...|    2|2014-06-05 12:04:...|      104| null|&lt;word-usage&gt...|          1|            2006|         1|2005|             2.0|
|           1|2014-07-02 19:39:...|        223|&lt;p&gt;Un chimi...|    2|2014-06-06 11:34:...|       45| null|&lt;word-usage&gt...|          1|            null|         1|2007|             2.0|
+------------+-----------

In [34]:
#sorting
itPostsDFFilterId.sort("id", ascending=False).show()

+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|title|                tags|answerCount|acceptedAnswerId|postTypeId|  id|
+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|           0|2014-06-06 13:18:...|        674|&lt;p&gt;Ad occhi...|    5|2014-06-06 13:18:...|     null| null|                    |       null|            null|         2|2009|
|           3|2014-06-07 12:11:...|        676|&lt;p&gt;&quot;sp...|    0|2014-06-06 12:29:...|     null| null|                    |       null|            null|         2|2008|
|           1|2014-07-02 19:39:...|        223|&lt;p&gt;Un chimi...|    2|2014-06-06 11:34:...|       45| null

## Example 5 Practice Quiz
Read “Italian_Stack_Exchange/italianComments.csv”

Create a dataframe with first 3 columns in the file to have  

Id : long, not nullable

commentDate: timestamp, nullable

comment : string, nullable


Find a row that the commentDate is 2013-11-07 and include “@Daniele”.

In [35]:
itStackRows = sc.textFile("./2018-msan697-example/Data/Italian_Stack_Exchange/italianComments.csv")

In [36]:
sqlContext = SQLContext(sc)
from pyspark.sql import Row
from datetime import datetime

def toIntSafe(inval):
  try:
    return int(inval)
  except ValueError:
    return None

def toTimeSafe(inval):
  try:
    return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S.%f")
  except ValueError:
    return None

def toLongSafe(inval):
  try:
    return long(inval)
  except ValueError:
    return None
    
def stringToPost(row):
  r = row.encode('utf8').split("~")
  return Row(
    toLongSafe(r[0]),
    toTimeSafe(r[1]),
    r[2])

In [37]:
from pyspark.sql.types import *
postSchema = StructType([
  StructField("id", LongType(),False),
  StructField("commentDate", TimestampType(), True),
  StructField("comment", StringType(), True),
  ])

In [38]:
rowRDD = itStackRows.map(lambda x:stringToPost(x))

In [39]:
itStackDFStruct = sqlContext.createDataFrame(rowRDD, postSchema)

In [40]:
itStackDFStruct.printSchema()

root
 |-- id: long (nullable = false)
 |-- commentDate: timestamp (nullable = true)
 |-- comment: string (nullable = true)



In [41]:
itStackDFStruct.show()

+---+--------------------+--------------------+
| id|         commentDate|             comment|
+---+--------------------+--------------------+
| 18|2013-11-05 20:39:...|It's going to be ...|
|  6|2013-11-05 20:41:...|Why not &quot;IL ...|
| 18|2013-11-05 20:43:...|    Yep, added that.|
|  6|2013-11-05 20:45:...|La squadra Milan ...|
|  6|2013-11-05 20:46:...|`ExamplesLa (squa...|
| 17|2013-11-05 20:48:...|Actually, no. As ...|
|  6|2013-11-05 20:52:...|Oh, c'mon: http:/...|
| 18|2013-11-05 20:54:...|There's no citati...|
| 12|2013-11-05 20:57:...|Se il genere dei ...|
| 18|2013-11-05 21:02:...|E' un'eccezione: ...|
| 12|2013-11-05 21:03:...|I agree with Dami...|
| 17|2013-11-05 21:14:...|Agreed, even thou...|
|  6|2013-11-05 21:15:...|@GabrielePetronel...|
| 12|2013-11-05 21:15:...|+1, but, neverthe...|
|  6|2013-11-05 21:17:...|@KyriakosKyritsis...|
| 17|2013-11-05 21:18:...|@KyriakosKyritsis...|
| 17|2013-11-05 21:21:...|*Il Cairo* Ã¨ il ...|
|  5|2013-11-05 21:42:...|Conversely, En

In [42]:
import pyspark.sql.functions as func
itStackDFStruct.filter(func.to_date(itStackDFStruct['commentDate'])=='2013-11-07').show()

+---+--------------------+--------------------+
| id|         commentDate|             comment|
+---+--------------------+--------------------+
|  5|2013-11-07 07:15:...|Very related ques...|
|  5|2013-11-07 07:21:...|I would add that ...|
| 57|2013-11-07 09:59:...|Nice quote! :D I ...|
| 79|2013-11-07 10:34:...|Great question, I...|
| 22|2013-11-07 10:55:...|That's indeed the...|
|120|2013-11-07 12:08:...|Yes, exactly! Man...|
| 70|2013-11-07 13:23:...|I changed your â...|
| 70|2013-11-07 13:30:...|It depends on usa...|
| 77|2013-11-07 15:25:...|  right, I modify it|
| 37|2013-11-07 15:30:...|@Daniele B: I kno...|
| 12|2013-11-07 15:54:...|As per the posted...|
| 19|2013-11-07 17:11:...|&quot;Ci&quot; in...|
|  5|2013-11-07 18:43:...|@kiamlaluno: *dia...|
| 98|2013-11-07 18:50:...|@kiamlaluno I don...|
| 63|2013-11-07 20:00:...|I am sorry I didn...|
| 22|2013-11-07 20:04:...|Just for clarity,...|
| 63|2013-11-07 20:08:...|I was referring t...|
| 22|2013-11-07 20:12:...|&quot;Mi sei a

In [43]:
import pyspark.sql.functions as func
#instr() : Returns the position of the first occurrence of substr in str
itStackDFStruct.filter((func.instr(itStackDFStruct['comment'],"@Daniele") > 0)).show()

+---+--------------------+--------------------+
| id|         commentDate|             comment|
+---+--------------------+--------------------+
| 37|2013-11-07 15:30:...|@Daniele B: I kno...|
+---+--------------------+--------------------+



In [44]:
#to_date() : Converts a string formatted like yyyy-MM-dd into a Date object.
itStackDFStruct.filter(func.to_date(itStackDFStruct['commentDate'])=='2013-11-07').filter((func.instr(itStackDFStruct['comment'],"@Daniele") > 0)).show()

+---+--------------------+--------------------+
| id|         commentDate|             comment|
+---+--------------------+--------------------+
| 37|2013-11-07 15:30:...|@Daniele B: I kno...|
+---+--------------------+--------------------+



In [45]:
k = sqlContext.read\
.csv("./2018-msan697-example/Data/Italian_Stack_Exchange/italianComments.csv",schema = postSchema,sep="~")

In [46]:
pandas_dataframe = k.toPandas()

## Example 6

 Users can upvote and downvote for marking certain questions useful or not.
 
italianVotes.csv (each column is delimiated with ~)

1. postId—Post’s unique ID, long()

2. id- Vote’s unique ID, long()

3. voteType – int()

4. creationTime – datetime()

Read “Italian_Stack_Exchange/italianVotes.csv”.

Create a dataframe in the file to have the following schema.

Sort the data where voteTypeId is 1 by creationTime in ascending order.
 

In [15]:
itstackvotes = sc.textFile("./2018-msan697-example/Data/Italian_Stack_Exchange/italianVotes.csv")

In [21]:
itstackvotes = itstackvotes.map(lambda x:x.encode('utf8'))\
    .map(lambda x:x.split('~'))

In [76]:
sqlContext = SQLContext(sc)
from pyspark.sql import Row
from datetime import datetime

def toIntSafe(inval):
    try:
        return int(inval)
    except ValueError:
        return None

def toTimeSafe(inval):
    try:
        return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S.%f")
    except ValueError:
        return None

def toLongSafe(inval):
    try:
        return long(inval)
    except ValueError:
        return None
    
def stringToPost(row):
    return Row(
        toLongSafe(row[0]),
        long(row[1]),
        int(row[2]),
        toTimeSafe(row[3])
        )

In [77]:
from pyspark.sql.types import *
postSchema = StructType([
  StructField("id", LongType(),False),
  StructField("postId", LongType(), False),
  StructField("voteTypeId", IntegerType(),False),
  StructField("creationTime", TimestampType(), False)
  ])

In [78]:
rowRDD = itstackvotes.map(lambda x:stringToPost(x))

In [79]:
itStackvotesDFStruct = sqlContext.createDataFrame(rowRDD, postSchema)

In [80]:
rowRDD

PythonRDD[79] at RDD at PythonRDD.scala:48

In [81]:
itStackvotesDFStruct.printSchema()

root
 |-- id: long (nullable = false)
 |-- postId: long (nullable = false)
 |-- voteTypeId: integer (nullable = false)
 |-- creationTime: timestamp (nullable = false)



In [82]:
itStackvotesDFStruct.show()

+----+------+----------+-------------------+
|  id|postId|voteTypeId|       creationTime|
+----+------+----------+-------------------+
|2657|   135|         2|2013-11-22 00:00:00|
|2658|   142|         2|2013-11-22 00:00:00|
|2659|   142|         1|2013-11-22 00:00:00|
|2660|   140|         2|2013-11-22 00:00:00|
|2661|   140|         1|2013-11-22 00:00:00|
|2662|  1354|         2|2013-11-22 00:00:00|
|2663|  1356|         2|2013-11-22 00:00:00|
|2664|  1353|         2|2013-11-22 00:00:00|
|2665|  1351|         2|2013-11-22 00:00:00|
|2667|  1357|         2|2013-11-22 00:00:00|
|2668|  1357|         2|2013-11-22 00:00:00|
|2669|  1351|         2|2013-11-22 00:00:00|
|2670|  1351|         1|2013-11-22 00:00:00|
|2671|  1352|         2|2013-11-22 00:00:00|
|2672|  1349|         2|2013-11-22 00:00:00|
|2673|  1357|         2|2013-11-22 00:00:00|
|2674|  1342|         2|2013-11-22 00:00:00|
|2675|  1340|         2|2013-11-22 00:00:00|
|2676|  1321|         2|2013-11-22 00:00:00|
|2677|  13

In [39]:
final = itStackvotesDFStruct.where('voteTypeID == 1')\
            .sort('creationTime',ascending =True)

## Example 7

In [68]:
itStackvotesDFStruct.groupBy('postId').avg('id').sort('avg(id)',acending = True).show()

+------+------------------+
|postId|           avg(id)|
+------+------------------+
|     7|              27.0|
|    21|              54.5|
|    33|              96.0|
|    18|             126.2|
|    26|233.66666666666666|
|    62|             263.0|
|    51| 291.3333333333333|
|    65|             294.0|
|    44|             297.0|
|    19| 300.3333333333333|
|    73|             309.0|
|    35|           370.125|
|    81|             370.6|
|    98|             448.0|
|    99|             462.0|
|    39|             469.8|
|    25|           504.375|
|    52| 520.7692307692307|
|    46|             521.4|
|   119| 524.6666666666666|
+------+------------------+
only showing top 20 rows



## Example 8

## Example 9

## Example 10

Create a UDF called scoreString which discretize scores 

Lower than 10 : low

Between 10 and 20 : med

Higher than 20 : high

and show the score strings.

In [54]:
#creates a function that is usable in data frames
from pyspark.sql.functions import *
def score_discrete(x):
    if x < 10:
        return 'low'
    elif (x >= 10 and x <20):
        return 'med'
    else:
        return 'high'
score_string = udf(lambda x: score_discrete(x))

In [55]:
itPostsDFStruct.select('id','score', score_string('score')).show()

+----+-----+---------------+
|  id|score|<lambda>(score)|
+----+-----+---------------+
|1165|   23|           high|
|1166|    1|            low|
|1167|    5|            low|
|1168|   11|            med|
|1169|    3|            low|
|1170|    8|            low|
|1171|    3|            low|
|1172|    1|            low|
|1173|    5|            low|
|1174|    5|            low|
|1175|    4|            low|
|1176|    3|            low|
|1177|    6|            low|
|1178|    1|            low|
|1179|    3|            low|
|1180|    5|            low|
|1181|    8|            low|
|1182|   11|            med|
|1183|    6|            low|
|1184|    7|            low|
+----+-----+---------------+
only showing top 20 rows



In [85]:
itStackvotesDFStruct.toPandas()

Unnamed: 0,id,postId,voteTypeId,creationTime
0,2657,135,2,2013-11-22
1,2658,142,2,2013-11-22
2,2659,142,1,2013-11-22
3,2660,140,2,2013-11-22
4,2661,140,1,2013-11-22
5,2662,1354,2,2013-11-22
6,2663,1356,2,2013-11-22
7,2664,1353,2,2013-11-22
8,2665,1351,2,2013-11-22
9,2667,1357,2,2013-11-22


**Grouping Data **

groupBy()

take column names or a list of column objects.

Return GroupedData Object.

Can use an aggregate function or agg(list_of_aggregate_fuctions).

## Example 11
Calculate min, max, average score per ownerUserId.

# Grouping and Joining DataFrame

join(dataframe, condition, join_type)

Join types : inner (default), outer, left_outer, right_outer, leftsemi
  

In [56]:
itPostsDFStruct.printSchema()

root
 |-- commentCount: integer (nullable = true)
 |-- lastActivityDate: timestamp (nullable = true)
 |-- ownerUserId: long (nullable = true)
 |-- body: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- creationDate: timestamp (nullable = true)
 |-- viewCount: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- answerCount: integer (nullable = true)
 |-- acceptedAnswerId: long (nullable = true)
 |-- postTypeId: long (nullable = true)
 |-- id: long (nullable = false)



In [57]:
itStackvotesDFStruct.printSchema()

root
 |-- id: long (nullable = false)
 |-- postId: long (nullable = false)
 |-- voteTypeId: integer (nullable = false)
 |-- creationTime: timestamp (nullable = false)



In [61]:
itStackvotesDFStruct.show()

Py4JJavaError: An error occurred while calling o216.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 1 times, most recent failure: Lost task 0.0 in stage 30.0 (TID 32, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-50-0914e07ee4a8>", line 1, in <lambda>
  File "<ipython-input-48-200c9eab1c5b>", line 27, in stringToPost
NameError: global name 'toLongsafe' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
	at sun.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-50-0914e07ee4a8>", line 1, in <lambda>
  File "<ipython-input-48-200c9eab1c5b>", line 27, in stringToPost
NameError: global name 'toLongsafe' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [60]:
itPostsDFStruct.show()

+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|title|                tags|answerCount|acceptedAnswerId|postTypeId|  id|
+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|           4|2013-11-11 18:21:...|         17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     null| null|                    |       null|            null|         2|1165|
|           5|2013-11-10 20:31:...|         12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61| null| &lt;word-choice&gt;|          1|            null|         1|1166|
|           2|2013-11-10 20:31:...|         17|&lt;p&gt;Il verbo...|    5|2013-11-10 19:58:...|     null| null

In [58]:
itPostsDFStruct.join(itStackvotesDFStruct.withColumnRenamed('id','votdId'),\
                     itPostsDFStruct.id == itStackvotesDFStruct.postId,\
                     'inner').show()

Py4JJavaError: An error occurred while calling o249.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 27.0 failed 1 times, most recent failure: Lost task 1.0 in stage 27.0 (TID 30, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-50-0914e07ee4a8>", line 1, in <lambda>
  File "<ipython-input-48-200c9eab1c5b>", line 27, in stringToPost
NameError: global name 'toLongsafe' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
	at sun.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-50-0914e07ee4a8>", line 1, in <lambda>
  File "<ipython-input-48-200c9eab1c5b>", line 27, in stringToPost
NameError: global name 'toLongsafe' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [59]:
sc

## Registering DataFrame in the Table
Catalog and Using SQL Commands

You can reference a DataFrame by its name by registering the DataFrame as a table.

Spark stores the table defitition in the table catalog.

Save as table (2)
1. .registerTempTable(<table_name>) -- Depreciated ◦ disappear with the Spark session.

2. .write.saveAsTable(<table_name>)
register a table permanently.

The table definitions will survive your application’s restarts and are persistent.

## Example 14 & 15
Save Post DataFrame as “Posts” and Vote DataFrame as “Votes”.

In [62]:
itPostsDFStruct.write.saveAsTable("Posts")

In [67]:
resultDf = sqlContext.sql("select * from Posts")

In [68]:
resultDf.show()

+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|title|                tags|answerCount|acceptedAnswerId|postTypeId|  id|
+------------+--------------------+-----------+--------------------+-----+--------------------+---------+-----+--------------------+-----------+----------------+----------+----+
|           4|2013-11-11 18:21:...|         17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     null| null|                    |       null|            null|         2|1165|
|           5|2013-11-10 20:31:...|         12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61| null| &lt;word-choice&gt;|          1|            null|         1|1166|
|           2|2013-11-10 20:31:...|         17|&lt;p&gt;Il verbo...|    5|2013-11-10 19:58:...|     null| null

## Loading and Saving Data using SparkSQL

Datatypes

◦ JSON

◦ Spark can automatically infer a JSON schema. ◦ ORC

◦ Optimized Row Columnar.

◦ Columnar format. ◦ Parquet

◦ Columnar file-based storage format optimized for relational access.

◦ Designed to be independent of any specific framework and free of unnecessary
dependencies.

◦ Relational Databases and Other DB with JDBC



Loading Data with SparkSQL

◦ JSON

◦ Use spark.read.json(<file_name>). ◦ ORC

◦ Use sqlContext.read.format("orc").load(<file_name>). ◦ Parquet

◦ Use spark.read.parquet(<file_name>).

◦ Relational Databases and Other DB with JDBC

◦ Use jdbc drivers.

In [69]:
world_bank = spark.read.json("./2018-msan697-example/Data/world_bank_project.json")

In [73]:
world_bank.columns

['_id',
 'approvalfy',
 'board_approval_month',
 'boardapprovaldate',
 'borrower',
 'closingdate',
 'country_namecode',
 'countrycode',
 'countryname',
 'countryshortname',
 'docty',
 'envassesmentcategorycode',
 'grantamt',
 'ibrdcommamt',
 'id',
 'idacommamt',
 'impagency',
 'lendinginstr',
 'lendinginstrtype',
 'lendprojectcost',
 'majorsector_percent',
 'mjsector_namecode',
 'mjtheme',
 'mjtheme_namecode',
 'mjthemecode',
 'prodline',
 'prodlinetext',
 'productlinetype',
 'project_abstract',
 'project_name',
 'projectdocs',
 'projectfinancialtype',
 'projectstatusdisplay',
 'regionname',
 'sector',
 'sector1',
 'sector2',
 'sector3',
 'sector4',
 'sector_namecode',
 'sectorcode',
 'source',
 'status',
 'supplementprojectflg',
 'theme1',
 'theme_namecode',
 'themecode',
 'totalamt',
 'totalcommamt',
 'url']

# Day 3 Machiene Learning
# Day 4

**Machine Learning With Spark (Spark ML)**

MainComponents

Algorithms

LogisticRegression

DecisionTree

Random Forest

K-MeanClustering

**Example 1**

Develop a Simple Linear Regression model(without stochastic gradient descent) to predict “petal_width” using “sepal_width” using “iris.csv”.

In [1]:
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *
import math
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [2]:
sqlContext.sql("DROP TABLE IF EXISTS test") #delete test table, if exists.

DataFrame[]

In [3]:
irisSchema = StructType([StructField("sepal_length", DoubleType(), True), 
                         StructField("sepal_width", DoubleType(), True),
                         StructField("petal_length", DoubleType(), True), 
                         StructField("petal_width", DoubleType(), True),
                         StructField("class", StringType(), True)])

In [4]:
iris = sqlContext.read\
    .format('com.databricks.spark.csv')\
    .options(header='false') \
    .load('./gitrepos/2018-msan697-example/Data/iris.csv', schema = irisSchema)

In [5]:
iris.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- class: string (nullable = true)



In [6]:
experiment = iris.select('petal_width','sepal_width')

In [7]:
experiment.limit(20).toPandas()

Unnamed: 0,petal_width,sepal_width
0,0.2,3.5
1,0.2,3.0
2,0.2,3.2
3,0.2,3.1
4,0.2,3.6
5,0.4,3.9
6,0.3,3.4
7,0.2,3.4
8,0.2,2.9
9,0.1,3.1


In [8]:
train , test = experiment.randomSplit([.9, .1])
train.cache()
test.write.saveAsTable("test")

In [9]:
covariance = train.cov('petal_width','sepal_width')

In [10]:
train.show()

+-----------+-----------+
|petal_width|sepal_width|
+-----------+-----------+
|        0.1|        3.0|
|        0.1|        3.0|
|        0.1|        3.1|
|        0.1|        3.1|
|        0.1|        3.1|
|        0.1|        4.1|
|        0.2|        2.9|
|        0.2|        3.0|
|        0.2|        3.0|
|        0.2|        3.1|
|        0.2|        3.1|
|        0.2|        3.2|
|        0.2|        3.2|
|        0.2|        3.2|
|        0.2|        3.2|
|        0.2|        3.2|
|        0.2|        3.3|
|        0.2|        3.4|
|        0.2|        3.4|
|        0.2|        3.4|
+-----------+-----------+
only showing top 20 rows



In [19]:
vari = train.select(variance('sepal_width')).first()[0]

In [20]:
coeff_0 = covariance/vari

In [21]:
vari

0.19772770549032065

In [22]:
coeff_0

-0.615450051361069

In [23]:
coeff_1 = train.select(mean("petal_width")).first()[0] - \
coeff_0 * train.select(mean("sepal_width")).first()[0]

In [24]:
coeff_1

3.048305084745766

In [25]:
test_output = sqlContext.sql(\
"SELECT sepal_width, petal_width, sepal_width * {0} + {1} AS prediction FROM test".format(coeff_0, coeff_1))

In [26]:
test_output.show()

+-----------+-----------+------------------+
|sepal_width|petal_width|        prediction|
+-----------+-----------+------------------+
|        3.0|        0.2|    1.201954930667|
|        3.5|        0.3|0.8942299049865001|
|        2.8|        1.3|1.3250449409392002|
|        3.0|        1.6|    1.201954930667|
|        2.5|        1.7|   1.5096799563475|
|        2.7|        1.8|   1.3865899460753|
|        3.0|        1.8|    1.201954930667|
|        3.0|        1.8|    1.201954930667|
|        2.7|        1.9|   1.3865899460753|
|        3.3|        2.1|   1.0173199152587|
|        3.0|        2.3|    1.201954930667|
|        3.2|        2.3|   1.0788649203948|
+-----------+-----------+------------------+



In [27]:
rmse = math.sqrt(test_output.rdd.map(lambda x : (x["prediction"] - x["petal_width"])**2)\
                      .reduce(lambda x,y : x + y)/test_output.count())

In [28]:
rmse

0.7399997352059987

**Machine Learning with Spark**

**MLlib**
Use RDD.
As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.

**ML (MLlib DataFrame-based API)**
New (Spark v 1.2).
Support Pipelines of estimators, transformer and evaluators.
Use DataFrame.

Some of the algorithms are not included because they were not designed for parallel platforms.

**Machine Learning With Spark (Spark ML)**

MainComponents 

Algorithms

LogisticRegression 

DecisionTree

Random Forest

K-MeanClustering

**Spark ML**

Uses DataFrame and Dataset.
Dataset : A strongly typed collection of objects 
(This includes DataFrame.)

Main Components 

Transformers

Estimators

Evaluators

ML Parameters  

ML Pipeline

Main Components

Transformers
Convert a dataset to another. 

Types

1) Feature transformer – take a data frame output a data frame with new columns like feature vectors.

2) Learning model – take a data frame and output a data frame with predicted labels.

transform() : takes DataFrame and optional parameters.

Estimators

Evaluators

ML Parameters

ML Pipeline

Main Components

Transformers

Estimators

Algorithms that produce transformers by

fit() : takes a DataFrame and parameters.

Ex. Linear regression produces a linear regression model with fitted weights and an intercepts, which is a transformer.

Evaluators

ML Parameters

ML Pipeline

Transformers

Estimators

Evaluators

Evaluate the performance of a model.

Evaluator()

ML Parameters

ML Pipeline

Main Components

Transformers

Estimators

Evaluators

ML Parameters

Specify parameters for estimators and

Also can use ParamGridBuilder() for choosing the model produced by the best-performing set of parameters in CrossValidator().

ML Pipeline

Algorithms
Feature Extractors, Transformers, and Selectors.

Feature Extractors : TF-IDF, Word2Vec, CountVectorizer

Feature Transformer : Tokenizer StopWordsRemover n-gram Binarizer PCA

PolynomialExpansion Discrete Cosine Transform (DCT) StringIndexer

IndexToString OneHotEncoder VectorIndexer Normalizer StandardScaler

MinMaxScaler MaxAbsScaler Bucketizer ElementwiseProduct SQLTransformer

VectorAssembler QuantileDiscretizer

Feature Selectors : VectorSlicer, Rformula, ChiSqSelector


Classification and Regression

Classification : Logistic regression, Decision tree classifier, Random forest classifier, Gradient-boosted tree classifier, Multilayer perceptron classifier, One-vs-Rest classifier (a.k.a. One-vs-All), Naive Bayes

Regression: Linear regression, Decision tree regression, Random forest regression, Gradient- boosted tree regression, Survival regression

Decision trees
Tree Ensembles: Random Forests, Gradient-Boosted Trees (GBTs)
Clustering : K-means, Latent Dirichlet allocation (LDA), Bisecting k-means, Gaussian Mixture
Model (GMM)
Collaborative filtering


## Example 2

Logistic Regression Example.

Adult Data Set : Prediction task is to determine whether a person makes over 50K a year.

48842 instances.

Attribute Information:
14attributes.

Logistic Regression Example.

Use adult.dat to generate a regression model.

1. Create an RDD.

2. Convert the RDD to DataFrame.

3. Clean the data.

1) Missing data imputation.
2) Convert strings to categorical values.

4. Train the model.
5. Interpret the model parameters.
6. Evaluate the model.


In [29]:
def toDoubleSafe(v):
    try:
        return float(v)
    except ValueError:
        return str(v) #if it is not a float type return as a string.

In [32]:
sqlContext.read.csv("./gitrepos/2018-msan697-example/Data/adult.raw"\
                    ,schema =adultschema ,ignoreTrailingWhiteSpace = True )

census_raw = sc.textFile("./gitrepos/2018-msan697-example/Data/adult.raw",4)\
 .map(lambda x:x.split(", "))

In [34]:
census_raw = census_raw.map(lambda row: [toDoubleSafe(x) for x in row])

In [36]:
adultschema = StructType([
    StructField("age",DoubleType(),True),
    StructField("capital_gain",DoubleType(),True),
    StructField("capital_loss",DoubleType(),True),
    StructField("education",StringType(),True),
    StructField("fnlwgt",DoubleType(),True),
    StructField("hours_per_week",DoubleType(),True),
    StructField("income",StringType(),True),
    StructField("marital_status",StringType(),True),
    StructField("native_country",StringType(),True),
    StructField("occupation",StringType(),True),
    StructField("race",StringType(),True),
    StructField("relationship",StringType(),True),
    StructField("sex",StringType(),True),
    StructField("workclass",StringType(),True),
])

In [63]:
test = sqlContext.read.csv("./gitrepos/2018-msan697-example/Data/adult.raw"\
                    ,schema =adultschema ,ignoreTrailingWhiteSpace = True )

In [44]:
from pyspark.sql import Row
columns = ["age", "workclass", "fnlwgt", "education", "marital_status",
           "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss",
           "hours_per_week", "native_country", "income"]
dfraw = sqlContext.createDataFrame(census_raw.map(lambda row: Row(**{x[0]: x[1] for x in zip(columns, row)})), \
                                    adultschema)

In [47]:
dfraw.printSchema()

root
 |-- age: double (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- education: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- income: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- race: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- workclass: string (nullable = true)



In [53]:
print census_raw.take(1)
print
print census_raw.map(lambda x: zip(columns, x)).take(1)
print 
print census_raw.map(lambda x: Row(**{x[0]:x[1] for x in zip(columns,x)})).take(1)

[[39.0, ' State-gov', 77516.0, ' Bachelors', ' Never-married', ' Adm-clerical', ' Not-in-family', ' White', ' Male', 2174.0, 0.0, 40.0, ' United-States', ' <=50K']]

[[('age', 39.0), ('workclass', ' State-gov'), ('fnlwgt', 77516.0), ('education', ' Bachelors'), ('marital_status', ' Never-married'), ('occupation', ' Adm-clerical'), ('relationship', ' Not-in-family'), ('race', ' White'), ('sex', ' Male'), ('capital_gain', 2174.0), ('capital_loss', 0.0), ('hours_per_week', 40.0), ('native_country', ' United-States'), ('income', ' <=50K')]]

[Row(age=39.0, capital_gain=2174.0, capital_loss=0.0, education=' Bachelors', fnlwgt=77516.0, hours_per_week=40.0, income=' <=50K', marital_status=' Never-married', native_country=' United-States', occupation=' Adm-clerical', race=' White', relationship=' Not-in-family', sex=' Male', workclass=' State-gov')]


In [69]:
test_dict = {'hi':1,'bye':3}
**test_dict

SyntaxError: invalid syntax (<ipython-input-69-dcd5540dab25>, line 2)

In [48]:
type(census_raw)

pyspark.rdd.PipelinedRDD

In [49]:
columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native_country',
 'income']

In [51]:
zip(columns,columns[:4])

[('age', 'age'),
 ('workclass', 'workclass'),
 ('fnlwgt', 'fnlwgt'),
 ('education', 'education')]

In [54]:
#Check the most commonly used vals.
print dfraw.groupBy(dfraw["workclass"]).count().orderBy("count",ascending=False).show()
print dfraw.groupBy(dfraw["occupation"]).count().orderBy("count",ascending=False).show()
print dfraw.groupBy(dfraw["native_country"]).count().orderBy("count",ascending=False).show()

+-----------------+-----+
|        workclass|count|
+-----------------+-----+
|          Private|33906|
| Self-emp-not-inc| 3862|
|        Local-gov| 3136|
|                ?| 2799|
|        State-gov| 1981|
|     Self-emp-inc| 1695|
|      Federal-gov| 1432|
|      Without-pay|   21|
|     Never-worked|   10|
+-----------------+-----+

None
+------------------+-----+
|        occupation|count|
+------------------+-----+
|    Prof-specialty| 6172|
|      Craft-repair| 6112|
|   Exec-managerial| 6086|
|      Adm-clerical| 5611|
|             Sales| 5504|
|     Other-service| 4923|
| Machine-op-inspct| 3022|
|                 ?| 2809|
|  Transport-moving| 2355|
| Handlers-cleaners| 2072|
|   Farming-fishing| 1490|
|      Tech-support| 1446|
|   Protective-serv|  983|
|   Priv-house-serv|  242|
|      Armed-Forces|   15|
+------------------+-----+

None
+-------------------+-----+
|     native_country|count|
+-------------------+-----+
|      United-States|43832|
|             Mexico|  95

In [60]:
#Missing data imputation - Impute the most common row for "?"
dfrawrp = dfraw.na.replace(["?"], ["Private"], ["workclass"])
dfrawrpl = dfrawrp.na.replace(["?"], ["Prof-specialty"], ["occupation"])
dfrawnona = dfrawrpl.na.replace(["?"], ["United-States"], ["native_country"])


In [62]:
dfrawrp.show()

+----+------------+------------+-------------+--------+--------------+------+--------------------+--------------+------------------+-------------------+--------------+-------+-----------------+
| age|capital_gain|capital_loss|    education|  fnlwgt|hours_per_week|income|      marital_status|native_country|        occupation|               race|  relationship|    sex|        workclass|
+----+------------+------------+-------------+--------+--------------+------+--------------------+--------------+------------------+-------------------+--------------+-------+-----------------+
|39.0|      2174.0|         0.0|    Bachelors| 77516.0|          40.0| <=50K|       Never-married| United-States|      Adm-clerical|              White| Not-in-family|   Male|        State-gov|
|50.0|         0.0|         0.0|    Bachelors| 83311.0|          13.0| <=50K|  Married-civ-spouse| United-States|   Exec-managerial|              White|       Husband|   Male| Self-emp-not-inc|
|38.0|         0.0|         0.

In [71]:
#Level Encoding

from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
         #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num",c)
    return newdf
dfnumeric = indexStringColumns(dfrawnona, ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country", "income"])

In [72]:
??StringIndexer

In [77]:
dfnumeric.limit(10).toPandas()

Unnamed: 0,age,capital_gain,capital_loss,fnlwgt,hours_per_week,workclass,education,marital_status,occupation,relationship,race,sex,native_country,income
0,39.0,2174.0,0.0,77516.0,40.0,4.0,2.0,1.0,3.0,1.0,0.0,0.0,0.0,0.0
1,50.0,0.0,0.0,83311.0,13.0,1.0,2.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0
2,38.0,0.0,0.0,215646.0,40.0,0.0,0.0,2.0,9.0,1.0,0.0,0.0,0.0,0.0
3,53.0,0.0,0.0,234721.0,40.0,0.0,5.0,0.0,9.0,0.0,1.0,0.0,0.0,0.0
4,28.0,0.0,0.0,338409.0,40.0,0.0,2.0,0.0,0.0,4.0,1.0,1.0,9.0,0.0
5,37.0,0.0,0.0,284582.0,40.0,0.0,3.0,0.0,2.0,4.0,0.0,1.0,0.0,0.0
6,49.0,0.0,0.0,160187.0,16.0,0.0,10.0,5.0,5.0,1.0,1.0,1.0,13.0,0.0
7,52.0,0.0,0.0,209642.0,45.0,1.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,1.0
8,31.0,14084.0,0.0,45781.0,50.0,0.0,3.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0
9,42.0,5178.0,0.0,159449.0,40.0,0.0,2.0,0.0,2.0,0.0,0.0,0.0,0.0,1.0


In [78]:
dfrawnona.limit(10).toPandas()

Unnamed: 0,age,capital_gain,capital_loss,education,fnlwgt,hours_per_week,income,marital_status,native_country,occupation,race,relationship,sex,workclass
0,39.0,2174.0,0.0,Bachelors,77516.0,40.0,<=50K,Never-married,United-States,Adm-clerical,White,Not-in-family,Male,State-gov
1,50.0,0.0,0.0,Bachelors,83311.0,13.0,<=50K,Married-civ-spouse,United-States,Exec-managerial,White,Husband,Male,Self-emp-not-inc
2,38.0,0.0,0.0,HS-grad,215646.0,40.0,<=50K,Divorced,United-States,Handlers-cleaners,White,Not-in-family,Male,Private
3,53.0,0.0,0.0,11th,234721.0,40.0,<=50K,Married-civ-spouse,United-States,Handlers-cleaners,Black,Husband,Male,Private
4,28.0,0.0,0.0,Bachelors,338409.0,40.0,<=50K,Married-civ-spouse,Cuba,Prof-specialty,Black,Wife,Female,Private
5,37.0,0.0,0.0,Masters,284582.0,40.0,<=50K,Married-civ-spouse,United-States,Exec-managerial,White,Wife,Female,Private
6,49.0,0.0,0.0,9th,160187.0,16.0,<=50K,Married-spouse-absent,Jamaica,Other-service,Black,Not-in-family,Female,Private
7,52.0,0.0,0.0,HS-grad,209642.0,45.0,>50K,Married-civ-spouse,United-States,Exec-managerial,White,Husband,Male,Self-emp-not-inc
8,31.0,14084.0,0.0,Masters,45781.0,50.0,>50K,Never-married,United-States,Prof-specialty,White,Not-in-family,Female,Private
9,42.0,5178.0,0.0,Bachelors,159449.0,40.0,>50K,Married-civ-spouse,United-States,Exec-managerial,White,Husband,Male,Private


In [80]:
#One-hot encoding
from pyspark.ml.feature import OneHotEncoder
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric, ["workclass", "education", "marital_status", "occupation", "relationship", "race", "native_country"])

In [81]:
dfhot.limit(10).toPandas()

Unnamed: 0,age,capital_gain,capital_loss,fnlwgt,hours_per_week,sex,income,workclass,education,marital_status,occupation,relationship,race,native_country
0,39.0,2174.0,0.0,77516.0,40.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,50.0,0.0,0.0,83311.0,13.0,0.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,38.0,0.0,0.0,215646.0,40.0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,53.0,0.0,0.0,234721.0,40.0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,28.0,0.0,0.0,338409.0,40.0,1.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
5,37.0,0.0,0.0,284582.0,40.0,1.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
6,49.0,0.0,0.0,160187.0,16.0,1.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7,52.0,0.0,0.0,209642.0,45.0,0.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
8,31.0,14084.0,0.0,45781.0,50.0,1.0,1.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
9,42.0,5178.0,0.0,159449.0,40.0,0.0,1.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [82]:
dfnumeric.limit(10).toPandas()

Unnamed: 0,age,capital_gain,capital_loss,fnlwgt,hours_per_week,workclass,education,marital_status,occupation,relationship,race,sex,native_country,income
0,39.0,2174.0,0.0,77516.0,40.0,4.0,2.0,1.0,3.0,1.0,0.0,0.0,0.0,0.0
1,50.0,0.0,0.0,83311.0,13.0,1.0,2.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0
2,38.0,0.0,0.0,215646.0,40.0,0.0,0.0,2.0,9.0,1.0,0.0,0.0,0.0,0.0
3,53.0,0.0,0.0,234721.0,40.0,0.0,5.0,0.0,9.0,0.0,1.0,0.0,0.0,0.0
4,28.0,0.0,0.0,338409.0,40.0,0.0,2.0,0.0,0.0,4.0,1.0,1.0,9.0,0.0
5,37.0,0.0,0.0,284582.0,40.0,0.0,3.0,0.0,2.0,4.0,0.0,1.0,0.0,0.0
6,49.0,0.0,0.0,160187.0,16.0,0.0,10.0,5.0,5.0,1.0,1.0,1.0,13.0,0.0
7,52.0,0.0,0.0,209642.0,45.0,1.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,1.0
8,31.0,14084.0,0.0,45781.0,50.0,0.0,3.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0
9,42.0,5178.0,0.0,159449.0,40.0,0.0,2.0,0.0,2.0,0.0,0.0,0.0,0.0,1.0


In [83]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
input_cols=["age","capital_gain","capital_loss","fnlwgt","hours_per_week","sex","workclass","education","marital_status","occupation","relationship","native_country","race"]

#VectorAssembler takes a number of collumn names(inputCols) and output column name (outputCol)
#and transforms a DataFrame to assemble the values in inputCols into one single vector with outputCol.
va = VectorAssembler(outputCol="features", inputCols=input_cols)
#lpoints - labeled data.
lpoints = va.transform(dfhot).select("features", "income").withColumnRenamed("income", "label")

In [84]:
lpoints.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(106,[0,1,3,4,10,...|  0.0|
|(106,[0,3,4,7,17,...|  0.0|
|(106,[0,3,4,6,15,...|  0.0|
|(106,[0,3,4,6,20,...|  0.0|
|(106,[0,3,4,5,6,1...|  0.0|
|(106,[0,3,4,5,6,1...|  0.0|
|(106,[0,3,4,5,6,2...|  0.0|
|(106,[0,3,4,7,15,...|  1.0|
|(106,[0,1,3,4,5,6...|  1.0|
|(106,[0,1,3,4,6,1...|  1.0|
|(106,[0,3,4,6,16,...|  1.0|
|(106,[0,3,4,10,17...|  1.0|
|(106,[0,3,4,5,6,1...|  0.0|
|(106,[0,3,4,6,21,...|  0.0|
|(106,[0,3,4,6,19,...|  1.0|
|(106,[0,3,4,6,23,...|  0.0|
|(106,[0,3,4,7,15,...|  0.0|
|(106,[0,3,4,6,15,...|  0.0|
|(106,[0,3,4,6,20,...|  0.0|
|(106,[0,3,4,5,7,1...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [85]:
#Divide the dataset into training and testing sets.
splits = lpoints.randomSplit([0.8, 0.2])

#cache() : the algorithm is interative and training and data sets are going to be reused many times.
adulttrain = splits[0].cache()
adultvalid = splits[1].cache()

In [86]:
#Train the model.
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.01, maxIter=1000, fitIntercept=True)
lrmodel = lr.fit(adulttrain)
#The above lines are same as..
#lr = LogisticRegression()
#lrmodel = lr.setParams(regParam=0.01, maxIter=1000, fitIntercept=True).fit(adulttrain)

In [87]:
#Interpret the model parameters
print(lrmodel.coefficients)
print(lrmodel.intercept)

[0.0221929753189,0.000142772621804,0.000576070680458,6.02925645705e-07,0.0248665300409,-0.515702897584,0.102132093255,-0.35300974575,0.0190835734907,-0.304297195192,-0.142206924083,0.29731392433,0.529140084555,-0.056001807522,-1.00141939673,-0.330962625191,0.0118503402111,0.702395778339,1.00279071118,0.128539648301,-0.83641488146,0.249338395296,-1.06143670914,-1.30407659952,1.41889968822,-1.24528571992,-0.677829837565,1.48168803259,-1.01515651702,-1.46468302736,-1.63383816805,0.84674840526,-0.702861492797,-0.309123909033,-0.337367394988,-0.201717853902,-0.168251123043,0.506600733933,0.476424513926,0.0280995772152,0.698427317262,-0.0371018173701,0.208784403615,-0.769461443736,-0.342675564287,-0.307403523606,-0.132817194102,-0.618790392607,-0.883587940191,0.450245136122,0.364881142473,-1.00026943042,-0.0397454004051,0.4475426548,-0.0920593667034,-0.763223150994,-0.336046245131,1.26807352189,-0.545391126326,0.204425211157,-0.59292711581,-0.00878009181512,0.297201419278,0.144827600423,-0.1

In [88]:
#Evaluate models using test dataset.
#First, transform the validation set.
validpredicts = lrmodel.transform(adultvalid)
validpredicts.show()

#rawPrediction : includes two values - log-odds that a sample doesn't and does belong to the category (making > 50,000).
#probability : the probability that the sample is not in the category.
#prediction : proability that the sample belongs to the category.
#validpredicts.select("rawPrediction").collect()
#validpredicts.select("probability").collect()



+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(106,[0,1,3,4,5,6...|  1.0|[0.34445588031051...|[0.58527250184974...|       0.0|
|(106,[0,1,3,4,5,6...|  1.0|[-0.1533225960752...|[0.46174426404340...|       1.0|
|(106,[0,1,3,4,5,6...|  0.0|[-0.3091073743654...|[0.42333263359954...|       1.0|
|(106,[0,1,3,4,5,6...|  0.0|[0.27984102857285...|[0.56950724954511...|       0.0|
|(106,[0,1,3,4,5,6...|  0.0|[0.33335325431136...|[0.58257505088083...|       0.0|
|(106,[0,1,3,4,5,6...|  1.0|[0.67395352250320...|[0.66238785299717...|       0.0|
|(106,[0,1,3,4,5,6...|  0.0|[3.14486579932169...|[0.95870594266531...|       0.0|
|(106,[0,1,3,4,5,6...|  0.0|[4.02017847499292...|[0.98236675152685...|       0.0|
|(106,[0,1,3,4,5,6...|  0.0|[1.82171196624299...|[0.86077142263385...|       0.0|
|(106,[0,1,3,4,5

In [89]:
#Evaluate the model. default metric : Area Under ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bceval = BinaryClassificationEvaluator()
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderROC:0.905398957308


In [90]:
#Evaluate the model. metric : Area Under PR
bceval.setMetricName("areaUnderPR")
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderPR:0.757285600517


In [91]:
# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(5)
#ParamGridBuilder() – combinations of parameters and their values.
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [1000]).addGrid(lr.regParam, [0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]).build()
#setEstimatorParamMaps() takes ParamGridBuilder().
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(adulttrain)

In [92]:
print cvmodel.bestModel.coefficients
print cvmodel.bestModel.intercept
print cvmodel.bestModel._java_obj.getMaxIter()
print cvmodel.bestModel._java_obj.getRegParam()

[0.0250291839694,0.00031090263033,0.000674272339803,7.06428108998e-07,0.0280638617094,-0.694609639324,-0.31271779459,-0.847671760186,-0.42905225614,-0.660669891177,-0.597808445872,-0.173241071517,0.119738919351,-0.504738775479,-4.22964183786,-0.60027173448,-0.218922766314,0.527308531124,0.850302151463,-0.118155924918,-1.28465930533,0.0415212601456,-1.53438031929,-1.81512774223,1.33018259714,-1.75498455386,-1.0492093332,1.40557705774,-1.43359332864,-2.19938951541,-5.51072018047,1.3303699311,-1.54427707051,-1.10108994861,-1.12728456265,-0.952728580053,-0.958763771149,1.0575283329,0.260075555145,-0.163621926165,0.52419236519,-0.239099746237,0.00881332602577,-1.13627382723,-0.573667076055,-0.676136186413,-0.334840074063,-0.907241461248,-1.25196345937,0.285647958548,0.212054663636,-2.01267056708,-0.17343884178,-0.494460123442,0.189019616148,-0.981359393594,-0.0474258874449,0.5928386983,-0.912702734011,-0.884036386224,-1.76120797787,-1.12175997073,-0.832311896299,-0.961844924821,-1.245163356

In [93]:
BinaryClassificationEvaluator().evaluate(cvmodel.bestModel.transform(adultvalid))

0.9075763535587398

In [94]:
BinaryClassificationEvaluator().setMetricName("areaUnderPR").evaluate(cvmodel.bestModel.transform(adultvalid))

0.76678885749796

## Spark ML – Example 3
Decision Tree 

Pros
Do not require data normalization, can handle numerical/categorical values, and work with missing values.
Cons
Prone to overfitting and is sensitive to the input data

In [153]:
pen_raw = sc.textFile("./gitrepos/2018-msan697-example/Data/penbased.dat", 4)

In [154]:
pen_raw = pen_raw.map(lambda x:x.split(", "))\
    .map(lambda row: [float(numbers) for numbers in row])

In [108]:
#Create a DataFrame
from pyspark.sql.types import *
from pyspark.sql import Row
penschema = StructType([
    StructField("pix1",DoubleType(),True),
    StructField("pix2",DoubleType(),True),
    StructField("pix3",DoubleType(),True),
    StructField("pix4",DoubleType(),True),
    StructField("pix5",DoubleType(),True),
    StructField("pix6",DoubleType(),True),
    StructField("pix7",DoubleType(),True),
    StructField("pix8",DoubleType(),True),
    StructField("pix9",DoubleType(),True),
    StructField("pix10",DoubleType(),True),
    StructField("pix11",DoubleType(),True),
    StructField("pix12",DoubleType(),True),
    StructField("pix13",DoubleType(),True),
    StructField("pix14",DoubleType(),True),
    StructField("pix15",DoubleType(),True),
    StructField("pix16",DoubleType(),True),
    StructField("label",DoubleType(),True)
])

dfpen = sqlContext.createDataFrame(pen_raw.map(lambda x : Row(x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16])), penschema)

In [111]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(outputCol="features", inputCols=dfpen.columns[0:-1]) #except the last col.
penlpoints = va.transform(dfpen).select("features", "label")

In [112]:
penlpoints.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[47.0,100.0,27.0,...|  8.0|
|[0.0,89.0,27.0,10...|  2.0|
|[0.0,57.0,31.0,68...|  1.0|
|[0.0,100.0,7.0,92...|  4.0|
|[0.0,67.0,49.0,83...|  1.0|
|[100.0,100.0,88.0...|  6.0|
|[0.0,100.0,3.0,72...|  4.0|
|[0.0,39.0,2.0,62....|  0.0|
|[13.0,89.0,12.0,5...|  5.0|
|[74.0,87.0,31.0,1...|  9.0|
|[48.0,96.0,62.0,6...|  8.0|
|[100.0,100.0,72.0...|  5.0|
|[91.0,74.0,54.0,1...|  9.0|
|[0.0,85.0,38.0,10...|  7.0|
|[35.0,76.0,57.0,1...|  3.0|
|[50.0,84.0,66.0,1...|  3.0|
|[99.0,80.0,63.0,1...|  9.0|
|[24.0,66.0,43.0,1...|  2.0|
|[0.0,73.0,19.0,99...|  2.0|
|[12.0,77.0,20.0,6...|  5.0|
+--------------------+-----+
only showing top 20 rows



In [169]:
#Create a DataFrame
from pyspark.sql.types import *
from pyspark.sql import Row
penschema = StructType([
    StructField("features", ArrayType(elementType=FloatType(),containsNull=False),True),
    StructField("label", DoubleType(),True)
])
dfpen1 = sqlContext.createDataFrame(pen_raw.map(lambda x : Row(x[0:16],x[16])), penschema)


from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
dfpen1 = dfpen1.select(list_to_vector_udf(dfpen1["features"])\
                                .alias("features"),'label')

pendtsets = dfpen1.randomSplit([0.8, 0.2])
pendttrain = pendtsets[0].cache()
pendtvalid = pendtsets[1].cache()

In [170]:
pendttrain.count(), pendtvalid.count(), dfpen1.count()

(7959, 1953, 9912)

In [171]:
# Train the data.
from pyspark.ml.classification import DecisionTreeClassifier
# Paramenters
#maxDepth : maximum tree depth (default : 5).
#maxBins : maximum number of bins when binning continuous features (default : 32).
#minInstancesPerNode : minimum number of dataset samples each branch needs to have after a split (default : 1).
#minInfoGain : minimum information gain for a split (default : 0).
dt = DecisionTreeClassifier(maxDepth=20, maxBins= 32, minInstancesPerNode=1, minInfoGain = 0)
dtmodel = dt.fit(pendttrain)

In [145]:
print dtmodel._call_java('toDebugString')

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4ad0b870ebb84b5dc3c1) of depth 20 with 617 nodes
  If (feature 13 <= 57.0)
   If (feature 4 <= 41.0)
    If (feature 9 <= 20.0)
     If (feature 14 <= 63.0)
      If (feature 10 <= 12.0)
       Predict: 8.0
      Else (feature 10 > 12.0)
       If (feature 12 <= 8.0)
        Predict: 8.0
       Else (feature 12 > 8.0)
        If (feature 5 <= 31.0)
         Predict: 4.0
        Else (feature 5 > 31.0)
         If (feature 0 <= 22.0)
          If (feature 7 <= 13.0)
           Predict: 6.0
          Else (feature 7 > 13.0)
           If (feature 0 <= 16.0)
            Predict: 1.0
           Else (feature 0 > 16.0)
            Predict: 4.0
         Else (feature 0 > 22.0)
          Predict: 6.0
     Else (feature 14 > 63.0)
      If (feature 15 <= 26.0)
       If (feature 10 <= 12.0)
        Predict: 1.0
       Else (feature 10 > 12.0)
        If (feature 10 <= 45.0)
         Predict: 2.0
        Else (feature 10 > 45.0)
      

In [146]:
#Test data.
dtpredicts = dtmodel.transform(pendtvalid)

In [147]:
dtpredicts.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[0.0,26.0,57.0,56...|  8.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       9.0|
|[0.0,31.0,52.0,59...|  8.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       8.0|
|[0.0,34.0,34.0,55...|  1.0|[0.0,394.0,0.0,0....|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,34.0,43.0,67...|  1.0|[0.0,115.0,0.0,0....|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,42.0,35.0,71...|  1.0|[0.0,394.0,0.0,0....|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,42.0,42.0,55...|  1.0|[0.0,394.0,0.0,0....|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,43.0,26.0,65...|  1.0|[0.0,394.0,0.0,0....|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,43.0,35.0,60...|  1.0|[0.0,2.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,44.0,30.0,61...|  1.0|[0.0,394.0,0.0,0....|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,46.0,52.0,

In [148]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(dtpredicts)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.041041


In [149]:
dtpredicts.select('label','prediction').rdd.map(lambda x : (x,1)).countByKey() 

defaultdict(int,
            {Row(label=0.0, prediction=0.0): 212,
             Row(label=0.0, prediction=4.0): 1,
             Row(label=0.0, prediction=6.0): 2,
             Row(label=0.0, prediction=8.0): 3,
             Row(label=0.0, prediction=9.0): 1,
             Row(label=1.0, prediction=0.0): 1,
             Row(label=1.0, prediction=1.0): 212,
             Row(label=1.0, prediction=2.0): 6,
             Row(label=1.0, prediction=3.0): 3,
             Row(label=1.0, prediction=4.0): 1,
             Row(label=1.0, prediction=6.0): 1,
             Row(label=1.0, prediction=7.0): 1,
             Row(label=1.0, prediction=9.0): 3,
             Row(label=2.0, prediction=1.0): 4,
             Row(label=2.0, prediction=2.0): 208,
             Row(label=2.0, prediction=3.0): 1,
             Row(label=2.0, prediction=7.0): 1,
             Row(label=2.0, prediction=8.0): 1,
             Row(label=3.0, prediction=1.0): 1,
             Row(label=3.0, prediction=3.0): 163,
             Ro

In [150]:
#Depreciated in Spark 2.0 -- Use accuracy
from pyspark.mllib.evaluation import MulticlassMetrics
dtresrdd = dtpredicts.select("prediction", "label").rdd #convert DataFrame to RDD.
dtmm = MulticlassMetrics(dtresrdd) 
print dtmm.precision() 
print(dtmm.confusionMatrix())



0.958958958959
DenseMatrix([[ 212.,    0.,    0.,    0.,    1.,    0.,    2.,    0.,    3.,
                 1.],
             [   1.,  212.,    6.,    3.,    1.,    0.,    1.,    1.,    0.,
                 3.],
             [   0.,    4.,  208.,    1.,    0.,    0.,    0.,    1.,    1.,
                 0.],
             [   0.,    1.,    0.,  163.,    0.,    3.,    0.,    0.,    0.,
                 0.],
             [   0.,    0.,    0.,    1.,  188.,    1.,    0.,    0.,    0.,
                 2.],
             [   0.,    1.,    0.,    0.,    1.,  167.,    0.,    0.,    2.,
                 3.],
             [   0.,    4.,    0.,    0.,    0.,    1.,  186.,    2.,    0.,
                 1.],
             [   0.,    2.,    0.,    2.,    0.,    0.,    0.,  192.,    3.,
                 0.],
             [   4.,    1.,    0.,    0.,    1.,    0.,    0.,    1.,  209.,
                 1.],
             [   0.,    2.,    0.,    2.,    1.,    9.,    0.,    0.,    0.,
               17

In [151]:
# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
cv = CrossValidator().setEstimator(dt).setEvaluator(evaluator).setNumFolds(5)
#ParamGridBuilder() – combinations of parameters and their values.
paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [5,10,15,20,25,30]).build()
#setEstimatorParamMaps() takes ParamGridBuilder().
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(pendttrain)
print cvmodel.bestModel._java_obj.getMaxDepth()
print "Accuracy : " +  str(MulticlassClassificationEvaluator().evaluate(cvmodel.bestModel.transform(pendtvalid)))

15
Accuracy : 0.959474397503


## Spark ML – Example 4
Random Forest Example

In [176]:
# Train the model.
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(maxDepth=20)
rfmodel = rf.fit(pendttrain)
print rfmodel._call_java('toDebugString')

RandomForestClassificationModel (uid=RandomForestClassifier_47389a77b6adfc387ccc) with 20 trees
  Tree 0 (weight 1.0):
    If (feature 15 <= 51.0)
     If (feature 1 <= 99.0)
      If (feature 14 <= 80.0)
       If (feature 7 <= 77.0)
        If (feature 0 <= 65.0)
         If (feature 10 <= 73.0)
          If (feature 2 <= 28.0)
           If (feature 5 <= 23.0)
            Predict: 0.0
           Else (feature 5 > 23.0)
            If (feature 8 <= 82.0)
             If (feature 15 <= 37.0)
              Predict: 7.0
             Else (feature 15 > 37.0)
              Predict: 8.0
            Else (feature 8 > 82.0)
             Predict: 4.0
          Else (feature 2 > 28.0)
           If (feature 0 <= 10.0)
            If (feature 9 <= 40.0)
             If (feature 6 <= 0.0)
              Predict: 8.0
             Else (feature 6 > 0.0)
              If (feature 11 <= 7.0)
               Predict: 7.0
              Else (feature 11 > 7.0)
               If (feature 1 <= 74.0)
      

In [173]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
rfpredicts = rfmodel.transform(pendtvalid)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(rfpredicts)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0128008


In [174]:
#Confusion Matrix
rfpredicts.select('label','prediction').rdd.map(lambda x : (x,1)).countByKey()

defaultdict(int,
            {Row(label=0.0, prediction=0.0): 193,
             Row(label=0.0, prediction=9.0): 1,
             Row(label=1.0, prediction=1.0): 198,
             Row(label=1.0, prediction=2.0): 6,
             Row(label=1.0, prediction=3.0): 2,
             Row(label=1.0, prediction=9.0): 1,
             Row(label=2.0, prediction=1.0): 1,
             Row(label=2.0, prediction=2.0): 200,
             Row(label=3.0, prediction=1.0): 1,
             Row(label=3.0, prediction=3.0): 194,
             Row(label=3.0, prediction=5.0): 1,
             Row(label=3.0, prediction=7.0): 1,
             Row(label=4.0, prediction=4.0): 183,
             Row(label=4.0, prediction=9.0): 3,
             Row(label=5.0, prediction=5.0): 159,
             Row(label=5.0, prediction=8.0): 1,
             Row(label=5.0, prediction=9.0): 2,
             Row(label=6.0, prediction=6.0): 198,
             Row(label=7.0, prediction=1.0): 1,
             Row(label=7.0, prediction=7.0): 218,
       

In [175]:
# Alternative : Evaluate with the test data set. (Using MLlib)
from pyspark.mllib.evaluation import MulticlassMetrics
rfpredicts = rfmodel.transform(pendtvalid)
rfresrdd = rfpredicts.select("prediction", "label").rdd
rfmm = MulticlassMetrics(rfresrdd)
#rfmm.precision()
print(rfmm.confusionMatrix())

DenseMatrix([[ 193.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
                 1.],
             [   0.,  198.,    6.,    2.,    0.,    0.,    0.,    0.,    0.,
                 1.],
             [   0.,    1.,  200.,    0.,    0.,    0.,    0.,    0.,    0.,
                 0.],
             [   0.,    1.,    0.,  194.,    0.,    1.,    0.,    1.,    0.,
                 0.],
             [   0.,    0.,    0.,    0.,  183.,    0.,    0.,    0.,    0.,
                 3.],
             [   0.,    0.,    0.,    0.,    0.,  159.,    0.,    0.,    1.,
                 2.],
             [   0.,    0.,    0.,    0.,    0.,    0.,  198.,    0.,    0.,
                 0.],
             [   0.,    1.,    0.,    0.,    0.,    0.,    0.,  218.,    0.,
                 1.],
             [   1.,    0.,    0.,    0.,    0.,    0.,    0.,    1.,  207.,
                 0.],
             [   0.,    1.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
               178.]])


## Example 5

**K-mean clustering**

Unsupervised learning

Dataset should be standardized.

Example – partition data into groups, anomaly detection, text/topic categorization

**K-mean clustering**

Kmeans

Parameters

k : Number of clusters to find (default – 2).

maxIter : Maximum number of iterations (default – 20). ◦ tol : Convergence tolerance (default – 0.0001).

seed : Random seed value for cluster initialization.

## Day5

## Load data from mongodb

In [1]:
spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("uri","mongodb://127.0.0.1/mydb.friend").load()

Py4JJavaError: An error occurred while calling o30.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at http://spark.apache.org/third-party-projects.html
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:549)
	at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
	at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:301)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.sql.DefaultSource.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
	at scala.util.Try.orElse(Try.scala:84)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:533)
	... 16 more


mongoimport --db msan697 --collection business --file /Users/siavashmortezavi/Documents/class_topics/Distributed_computing/Distributed_computing_2/gitrepos/2018-msan697-example/Data/business.json

msabn697 is the database , business is the collection or table






In [2]:
db = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("uri","mongodb://127.0.0.1/msan697.business").load()

Py4JJavaError: An error occurred while calling o52.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at http://spark.apache.org/third-party-projects.html
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:549)
	at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
	at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:301)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.sql.DefaultSource.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
	at scala.util.Try.orElse(Try.scala:84)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:533)
	... 16 more


In [3]:
db.show()

NameError: name 'db' is not defined

In [4]:
db.printSchema()

NameError: name 'db' is not defined

# NoSQL Interview Questions

What is NoSQL?

Relational Database vs. NoSQL 

Impedance mismatch

Polyglot persistence 

Aggregate-oriented database 

Key-value database

Document database

Column family database

Graph database

Replication vs sharding

CAP Theorem

Eventual Consistency

# MongoDB Interview Questions

MongoDB’s type

MongoDB’s characteristics

Alternative databases

Supported programming languages
Index

Aggregation Operations(aggregation pipeline)

Sharding Replication ObjectId

Consistency

## Why NoSQL?

**Relational Model**

Database is a collection of relations

**Relational Model**

Database is a collection of relations
Each relation has attributes

Database is a collection of relations

Each relation has attributes 

Each relation has a collectin of tuples

## Example: SQL (Structured Query Language)

Manage data in a relational database

Selects rows from a relation satisfying a given condition.

Basic structure

SELECT, FROM, WHERE

SELECT DATA _FIELD FROM TABLE

WHERE CONDITIONS

Ex) CUSTOMER

EX) SELECT *

FROM Customer

WHERE Name = “Diane Woodbridge”


**Pros**

Concurrency Control - ACID (Atomic, Consistent, Isolated and Durable) Transaction Management

**Atomicity** :

An operation either succeeds or fails entirely.
Many rows spanning many tables are updated as a single operation.

**Consistency**:

Any given transaction must change affected data only in allowed ways.


**Isolation:**

Defines how/when the changes made by one operation become visible to  other. Concurrent operations are isolated from each other so that they can’t see a partial update.

**Durability:**
Once a transaction has been committed, it will remain permanently.

**Standard Model**

Different vendors’ query languages are similar.

Transaction operations work in the similar way.

## 21st Century and Relational Databases

New and Large Volumes of Data

More internet users and websites started tracking user activity.èData and traffic increase. Schema changes.

As a solution, started to store data in the clusters.èReliability issues.
Most relational databases are not designed to run on clusters.

**Lots of data**
Scale out(distributed) or scale up(better machines)

**sharding** (breaking data into pieces across machines)
Problems: Reliability Issue. 

**JSON (JavaScript Object Notation)**

Open-standard file format that uses human-readable text to transmit data objects consisting of key-value pairs and array data types (or any other serializable value).

Commonly used for browser–server communication for REST API.

**viewing json in terminal**

cat jaa.usfca.edu.har | jq '.' | more

**What if a table schema changes all the time??**


## NoSQL (Not Only SQL)

Why NoSQL?

**Impedance Mismatch**

Relation model ≠ In-memory data structure (object)

For application Development Productivity,
    Better mapping with in-memory data structures for the application.

Large volumes of data (2000s)
    Scaling up vs. Scaling out?
    Run large data on clusters of many smaller and cheaper machines. 
    Cheaper and reliable.
    
Example of non-relational database.
    Google BigTable and Amaon Dynamo


Generally,
Take schemaless data.

Non-relational.

Open-source.

Trade off traditional consistency for other properties. 

Run on clusters.

![abc](./images/a.png)

## Postgres and NoSQL

Postgres supports a JSON data type.

However,
Postgres does not offer any native mechanisms for data availability or scaling the database beyond a single server.

Lacks mechanisms for automatic failover and recovery between database replicas.

No native mechanisms to partition (shard) the database across a cluster of nodes.

It is not as natural to work with JSON data in Postgres.

The non-standard extensions to SQL to query and manipulate JSON are not


![abc](./images/b.png)

## Aggregate-oriented NoSQL Databases

Aggregate: Collection of related objects treated as a unit.

For analyzing data, you might want to place some data together as a unit.

On a cluster, an aggregate is stored together on a node.

A single aggregate is a unit of atomic updates.

Aggregate-oriented databases use aggregates

indexed by key for data lookup.

Aggregate-oriented databases don’t have ACID
transactions that span multiple aggregates. 

cf. RDBMS is aggregate-ignorant.

**Pros**

Provides clearer semantics to consider by focusing on the aggregate unit used by applications.

Better design choice for running on a cluster.

**Cons**

Drawing boundaries of an aggregate is not easy.

When a goal of data management/analysis is not clear, aggregate models might not be the best choice.

Doesn’t support ACID transactions.

## Example 2

Draw boundaries of an aggregate.

![abc](./images/c.png)

Types

1) Key-value and 2) Document Database

Each aggregate has a key (ID). ◦ Key-value database

We can store whatever we want in aggregates.

Values are opaque, the store doesn't know anything about them.

Key lookup for the entire aggregate. ◦ Document database

It has allowable structures and types.

Access by key and also by the fields in the aggregates. (can retrieve fields in the value.)

Key-value and Document DB are similar, and their distinction is often blurry.
But with Document DB, you can submit a query based on the internal structure of the document.

## Example 3

Find all with grade is ”C” in “business” collection under the “msan697” database.

db.business.find({'grades.grade':'C'})