# Run Spark Locally


In [1]:
from pyspark.sql import SparkSession

# spark session is now the entry point of spark program
# for line 8, can use local[n] for run spark locally with n cores
spark = SparkSession.builder \
    .master('local') \
    .appName('Spark Try') \
    .getOrCreate()

print spark
spark.stop()

<pyspark.sql.session.SparkSession object at 0x7ffb8c1e5750>


# Run Spark Standalone Cluster

The following piece of code will run Spark on its standalone cluster mode. Please don't redo this before you stop a cluster....

In [32]:
import os

# start spark master program on this machine and register worker nodes
os.system('start-spark-slurm.sh&') # use & to put it into background

# get ip address of this machine
ip = os.popen('hostname -i').read().strip('\n')
print 'Spark Master web UI: http://{}:8080'.format(ip)

Spark Master web UI: http://10.50.221.12:8080


Start a session and open web UI (you have it when you run spark locally, too).

In [34]:
from pyspark.sql import SparkSession
import os
    
# get ip address of master node (current machine)
ip = os.popen('hostname -i').read().strip('\n')

# change 'local' to be ip of master node
spark = SparkSession.builder \
    .master('spark://' + ip + ':7077') \
    .appName('Spark Try') \
    .getOrCreate()
print 'Spark Session web UI: http://{}:4040'.format(ip)

Spark Session web UI: http://10.50.221.12:4040


A Small Map/Reduce run

In [35]:
import math

npart = 200
data = spark.sparkContext.parallelize(xrange(1000000000), npart)
print data.map(lambda x: math.sqrt(float(x*2))).reduce(lambda a, b: a + b)

2.98142396776e+13


Stop session & cluster

In [36]:
spark.stop()                      # Spark Session web UI will stop
os.system('stop-spark-slurm.sh')  # Spark Master web UI will stop

0

# First View of `RDD`

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# spark session is now the entry point of spark program
spark = SparkSession.builder \
    .master('local') \
    .appName('Spark Try') \
    .getOrCreate()

# create RDD from in memory collections
npartition = 10
data_to_dist = xrange(10000)
data = spark.sparkContext.parallelize(data_to_dist, npartition)
print type(data)
print 'number of partitions: {}'.format(data.getNumPartitions())

# get all data
print type(data.collect()), len(data.collect())

# get first 10 entries
print data.take(10)

# a small map reduce
def f(x):
    return x*2
print data.map(f).reduce(lambda a, b: a + b)
print sum([i*2 for i in data_to_dist])
    
# create RDD from file
lines = spark.sparkContext.textFile('/software/spark-2.1-el7-x86_64/examples/src/main/resources/people.txt')
print lines.collect()
print lines.map(lambda l: l.split(',')[0]).reduce(lambda a, b: a + '__' + b)
spark.stop()

<class 'pyspark.rdd.PipelinedRDD'>
number of partitions: 10
<type 'list'> 10000
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
99990000
99990000
[u'Michael, 29', u'Andy, 30', u'Justin, 19']
Michael__Andy__Justin


# `DataFrame` from file

In [18]:
from pyspark.sql import SparkSession

spark  = SparkSession.builder.master('local').appName('SOU').getOrCreate()
df = spark.read.json('/project/cmsc25025/sou/speeches.json')
print type(df)
df.show()

# SQL-like operations
df.groupBy('president').count().show(8)

# the data frame is untyped...
df.printSchema()
df.filter(df['year'] > 2010).show()

# print first 500 chars of speeches
# df.rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
speech = df.rdd.map(lambda x: [(x['president'], int(x['year']), x['text'])]).reduce(lambda a, b: a+b)
for j in xrange(3):
    print "%s (%d):\n%s\n%s...\n\n" % (speech[j][0], speech[j][1], 30*"-", speech[j][2][0:500])
spark.stop()

<class 'pyspark.sql.dataframe.DataFrame'>
+--------------------+--------------------+----+
|           president|                text|year|
+--------------------+--------------------+----+
|        James Monroe| Fellow-Citizens ...|1821|
|    William McKinley| To the Senate an...|1897|
|Dwight D. Eisenhower|[Delivered in per...|1960|
|     Calvin Coolidge|Since the close o...|1923|
|       James Madison| Fellow-Citizens ...|1816|
|    Grover Cleveland| To the Congress ...|1886|
|   John Quincy Adams| Fellow Citizens ...|1827|
|  Theodore Roosevelt| To the Senate an...|1905|
|   Lyndon B. Johnson|Mr. Speaker, Mr. ...|1965|
|       James K. Polk| Fellow-Citizens ...|1848|
|      Woodrow Wilson|Gentlemen of the ...|1913|
|Dwight D. Eisenhower|[Delivered in per...|1955|
|         George Bush|Mr. President and...|1991|
|     Franklin Pierce| Fellow-Citizens ...|1856|
...|1944| D. Roose...|To the Congress:
|   Lyndon B. Johnson|[Delivered in per...|1968|
|      Andrew Johnson| Fellow-Citizen

# `DataFrame` from `RDD`

In [3]:
spark  = SparkSession.builder.appName('spark_try').getOrCreate()

# can also create dataset from RDD
lines = spark.sparkContext.textFile('/software/spark-2.1-el7-x86_64/examples/src/main/resources/people.txt')
ppl = lines.map(lambda l: l.split(",")).map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(ppl)
schemaPeople.show()
schemaPeople.createOrReplaceTempView('people')

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
teenNames = teenagers.rdd.map(lambda p: 'Name: ' + p['name']).collect()
for name in teenNames:
    print(name)
spark.stop()

+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+

Name: Justin


**Spark is creating more high level abstraction. `DataFrame` encapsulates RDD and support SQL-like queries. `pyspark.ml`  uses `DataFrame`. But sometimes lower level abstractions are easier to process.... `pyspark.mllib` uses `RDD`. We could be flexible according to our datastructure and context.**

# In Class Lab - print word count of speeches per year

In [1]:
from pyspark.sql import SparkSession

spark  = SparkSession.builder.master('local').appName('SOU').getOrCreate()
df = spark.read.json('/project/cmsc25025/sou/speeches.json')

print df.rdd.map(lambda x: len(x['text'].split(' '))).collect()

spark.stop()

[5765, 12019, 5633, 6657, 3337, 14927, 6887, 24925, 4406, 21124, 3531, 7233, 3968, 10398, 3693, 4812, 11930, 4507, 5133, 13913, 9950, 2054, 7225, 19329, 27243, 3825, 12097, 14527, 3197, 5502, 6902, 4825, 2284, 6438, 13346, 19434, 2189, 4604, 6053, 3405, 4467, 27731, 4468, 12884, 11367, 10710, 5341, 1950, 7277, 5248, 9173, 3105, 5368, 16030, 1393, 3256, 4707, 7475, 4431, 3197, 6860, 12270, 4944, 6384, 2840, 9692, 6749, 5534, 9514, 3650, 2200, 7378, 15691, 12266, 6062, 7090, 13131, 2687, 2659, 3212, 8736, 8257, 1816, 14992, 5591, 19501, 4948, 2086, 8261, 3472, 3466, 6874, 11524, 8624, 9185, 14853, 2090, 2422, 4465, 3221, 7227, 3454, 4919, 7989, 13815, 4708, 10751, 23667, 2841, 1355, 4559, 13321, 7816, 23466, 4821, 2260, 11410, 10094, 2894, 25130, 8497, 4377, 1654, 4140, 9274, 5336, 5432, 7812, 7804, 9820, 4658, 7659, 6141, 5548, 12106, 8020, 8337, 2074, 7026, 3770, 8206, 5287, 2248, 5552, 10448, 13585, 3208, 13407, 4104, 5676, 9856, 27913, 18119, 7120, 4144, 10846, 2687, 15385, 7975, 960