In [4]:
# !pyspark # does not seem necessary
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row # for RDDs

from IPython.display import display
import os

conf = sc.getConf()
if conf.get('spark.master') == 'yarn':
    print("cloud")
    root_path ="gs://josh-spark-jupyter-resources"
else:
    print("local")
    root_path = os.path.split(os.getcwd())[0] + "/resources"

json_path = root_path + "/people.json"
txt_path = root_path + "/people.txt"
display(type(sc))
display(type(spark))
display('version:', spark.version)
display('SPARK_HOME:', os.environ['SPARK_HOME'])

local


pyspark.context.SparkContext

pyspark.sql.session.SparkSession

'version:'

'2.0.2'

'SPARK_HOME:'

'/usr/local/Cellar/apache-spark/2.0.2/libexec'

# Basics

In [5]:
# RDD from list
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
display(type(distData))

# Saving and Loading SequenceFiles
import random
tmp_file = "/tmp/blah" + str(random.randint(1,10000))
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
rdd.saveAsSequenceFile(tmp_file)
display(sorted(sc.sequenceFile(tmp_file).collect()))


txt_path2 = root_path + "/words.txt"

# mapreduce
lines = sc.textFile(txt_path2)
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
display(totalLength)

# passing functions
def line_word_count(s):
    words = s.split(" ")
    return len(words)
display(sc.textFile(txt_path2).map(line_word_count).collect())

# wordcount reduceByKey
from operator import add
lines = spark.read.text(txt_path2).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(add)
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))
    
# shared variables
broadcastVar = sc.broadcast([1, 2, 3])
display(broadcastVar.value)
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
display(accum.value)

pyspark.rdd.RDD

[(1, 'a'), (2, 'aa'), (3, 'aaa')]

115

[5, 9, 10]

quick: 1
jumps: 1
cat: 1
Tyger: 2
night: 1
lazy: 1
brown: 1
forests: 1
hat: 1
in: 2
the: 6
burning: 1
of: 1
fox: 1
bright: 1
dog: 1
over: 1


[1, 2, 3]

10

# Spark SQL

### Creating DataFrames

In [6]:
df = spark.read.json(json_path)
# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Untyped Dataset Operations (aka DataFrame Operations)

In [None]:
# Print the schema in a tree format
df.printSchema()
# Select only the "name" column
df.select("name").show()
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# Select people older than 21
df.filter(df['age'] > 21).show()
# Count people by age
df.groupBy("age").count().show()

### Running SQL Queries Programmatically

In [None]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

### Pandas integration

In [None]:
# convert from Spark DF to Pandas DF
pandas_df = df.toPandas()
display(type(pandas_df))
# convert from Pandas DF to Spark DF
df2 = spark.createDataFrame(pandas_df)
display(type(df2))
pandas_df

### RDD level

In [None]:
# Load a text file and convert each line to a Row.
lines = sc.textFile(txt_path)
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
display(people)

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)

### Schema Merging

In [None]:
# Create a simple DataFrame, stored into a partition directory
squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("/tmp/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("/tmp/test_table/key=2")

# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("/tmp/test_table")
mergedDF.printSchema()