In [1]:
import pyspark
import string

from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import (
    col,
    udf,
    date_format,
    to_date
)
from pyspark.sql.types import (
    StringType,
    ArrayType,
    StructField,
    StructType,
    FloatType,
    IntegerType,
)
from pyspark.sql.functions import upper, lower, col, udf

### Pyspark docs are good and very useful
https://spark.apache.org/docs/latest/api/python/index.html

<img src="https://spark.apache.org/docs/latest/img/cluster-overview.png" alt="drawing">
Image from https://spark.apache.org/docs/latest/cluster-overview.html

<img src="https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2016/11/MapReduce-Way-MapReduce-Tutorial-Edureka-768x339.png" alt="drawing"/>
Image from https://www.edureka.co/blog/mapreduce-tutorial/

## Setup

In [2]:
spark = (
        SparkSession
        .builder
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")
        .getOrCreate()
    )
spark.sparkContext.setLogLevel("INFO")

In [3]:
sc = spark.sparkContext
sc, sc.emptyRDD

(<SparkContext master=local[*] appName=pyspark-shell>,
 <bound method SparkContext.emptyRDD of <SparkContext master=local[*] appName=pyspark-shell>>)

In [4]:
sqlc = pyspark.SQLContext(sc)
sqlc, sqlc.registerDataFrameAsTable

(<pyspark.sql.context.SQLContext at 0x7f60c03c5c50>,
 <bound method SQLContext.registerDataFrameAsTable of <pyspark.sql.context.SQLContext object at 0x7f60c03c5c50>>)

## APIs/Modules

### Spark (non-sql)
  - RDD
    - very flexible
    - not typed (similar to pandas object type on a row)

### Spark SQL
  - Dataframe
    - more efficient (faster)
    - typed (pyspark.sql.types)
    - supports SQL transforms/queries
  - Interactions with other data sources (reading/saving to Hive, reading from parquet, etc.)

## RDD

In [5]:
# sample text
txt = """
The Zen of Python, by Tim Peters

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
"""
exclude = set(string.punctuation)
txt = ''.join(char for char in txt if char not in exclude)

In [6]:
rdd = sc.parallelize(txt.split())
rdd, rdd.take(1)

(ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195, ['The'])

In [7]:
# map for simple transforms
rdd_lower = rdd.map(lambda x: x.lower())
rdd_lower.take(3)

['the', 'zen', 'of']

In [8]:
# flatMap to produce a flattened collection
# rather than a collection of collections
rdd_lower_flat = rdd.flatMap(lambda x: x.lower())
rdd_lower_flat.take(4)

['t', 'h', 'e', 'z']

In [9]:
# reduceByKey to apply an aggregation function to items with same key (first item)
rdd_word_counts = rdd_lower.map(lambda x: (x, 1))
print(rdd_word_counts.take(3))
rdd_word_sums = rdd_word_counts.reduceByKey(lambda x, y: x + y)
print(rdd_word_sums.take(3))

[('the', 1), ('zen', 1), ('of', 1)]
[('of', 3), ('python', 1), ('implicit', 1)]


In [10]:
# reduce without by key
from operator import add

num_words = rdd_word_counts.map(lambda x: x[1]).reduce(add)
num_words, rdd.count()

(143, 143)

## Dataframe

In [11]:
df = rdd.map(lambda x: Row(word=x.lower())).toDF()
df, df.take(1)

(DataFrame[word: string], [Row(word='the')])

In [12]:
# transformation functions available in pyspark.sql.functions
df = df.withColumn('word_upper', upper(df.word))
df, df.take(2)

(DataFrame[word: string, word_upper: string],
 [Row(word='the', word_upper='THE'), Row(word='zen', word_upper='ZEN')])

In [13]:
# can also create udf's
@udf(StringType())
def worddouble(s):
    return s * 2

df = df.withColumn('wordx2', worddouble(df.word))
df, df.take(2)

(DataFrame[word: string, word_upper: string, wordx2: string],
 [Row(word='the', word_upper='THE', wordx2='thethe'),
  Row(word='zen', word_upper='ZEN', wordx2='zenzen')])

In [14]:
# renaming cols
df.selectExpr("word as original_word", "word_upper", "wordx2")

DataFrame[original_word: string, word_upper: string, wordx2: string]

In [15]:
df.select(col("word").alias("original_word"), col("word_upper"), col("wordx2"))

DataFrame[original_word: string, word_upper: string, wordx2: string]

In [16]:
# aggregation using methods (similar to pandas)
word_count_df = df.groupBy('word').count()
word_count_df, word_count_df.take(3)

(DataFrame[word: string, count: bigint],
 [Row(word='those', count=1),
  Row(word='often', count=1),
  Row(word='explain', count=2)])

### the above can be done with sql strings too

In [17]:
# register dfs as tables
sqlc.registerDataFrameAsTable(df, "df")

In [18]:
# select query
sql_df = sqlc.sql("SELECT * FROM df")
sql_df, sql_df.take(2)

(DataFrame[word: string, word_upper: string, wordx2: string],
 [Row(word='the', word_upper='THE', wordx2='thethe'),
  Row(word='zen', word_upper='ZEN', wordx2='zenzen')])

In [19]:
# query with transform and alias
spark.udf.register("worddouble", worddouble)
sql_df = sqlc.sql("""
    SELECT 
        word, 
        word_upper,
        word_upper as word_upper2,
        wordx2,
        worddouble(wordx2) as wordx4
    FROM df""")
sql_df, sql_df.take(2)

(DataFrame[word: string, word_upper: string, word_upper2: string, wordx2: string, wordx4: string],
 [Row(word='the', word_upper='THE', word_upper2='THE', wordx2='thethe', wordx4='thethethethe'),
  Row(word='zen', word_upper='ZEN', word_upper2='ZEN', wordx2='zenzen', wordx4='zenzenzenzen')])

In [20]:
# query with transform and alias
spark.udf.register("worddouble", worddouble)
agg_df = sqlc.sql("""
    SELECT 
        word, 
        count(*) as count
    FROM df
    GROUP BY word
    """)
agg_df, agg_df.take(5)

(DataFrame[word: string, count: bigint],
 [Row(word='those', count=1),
  Row(word='often', count=1),
  Row(word='explain', count=2),
  Row(word='complex', count=2),
  Row(word='special', count=2)])

## Lazy evaluation

In [21]:
import time as timemod

In [22]:
%%time
def long_sleep(x):
    timemod.sleep(10)
    return x

rdd_sleep = rdd_lower.map(lambda x: long_sleep(x))

CPU times: user 17 µs, sys: 2 µs, total: 19 µs
Wall time: 22.9 µs


In [23]:
%%time
# work actually happens when you ask for a result, not before
# so compute time may not happen where you expect
rdd_sleep.take(1)

CPU times: user 15.5 ms, sys: 629 µs, total: 16.1 ms
Wall time: 10.1 s


['the']

In [24]:
def func1(x):
    return x * 2

def func2(x):
    return x.upper()

def func3(x):
    raise NotImplementedError
    return x

rdd_many_transforms = (
    rdd_lower
    .map(lambda x: func1(x))
    .map(lambda x: func2(x))
    .map(lambda x: func3(x))
)

In [25]:
# errors may happen later than you expect or 
# be due to code upstream of where a result was asked for

# errors may be hard to decipher
# generally focus on the lines concerning python and then look at scala/java 
# lines if needed
rdd_many_transforms.take(1)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 29.0 failed 1 times, most recent failure: Lost task 0.0 in stage 29.0 (TID 76, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-24-aefe538b2c93>", line 15, in <lambda>
  File "<ipython-input-24-aefe538b2c93>", line 8, in func3
NotImplementedError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "/home/joel/Installs/miniconda3/envs/pipelines_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-24-aefe538b2c93>", line 15, in <lambda>
  File "<ipython-input-24-aefe538b2c93>", line 8, in func3
NotImplementedError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


## Reading/Writing

### Common Data Formats
- Parquet
  - column-oriented
  - self-describing (schema is built into the file)
  - not plain text
- ORC (optimized row columnar)
  - column-oriented
  - self-describing (schema is built into the file)
  - not plain text
- Avro
  - row-oriented
  - self-describing (schema is built into the file)
  - not plain text
- JSON (ndjson, jsonl)
  - new-line delimited JSON (each line in file is a json)
  - plain text 
- csv
  - plain text
  - headers can be a nuisance

## https://blog.cloudera.com/benchmarking-apache-parquet-the-allstate-experience/

In [26]:
import seaborn as sns
import os
import shutil

basedir = "./test/"
shutil.rmtree(basedir)
csv_fp = os.path.join(basedir, "iris.csv")
os.makedirs(basedir, exist_ok=True)

In [27]:
iris = sns.load_dataset('iris')
iris.shape

(150, 5)

In [28]:
iris.head(5)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa


In [29]:
iris.to_csv(csv_fp, index=False)

In [30]:
!head -3 ./test/iris.csv

sepal_length,sepal_width,petal_length,petal_width,species
5.1,3.5,1.4,0.2,setosa
4.9,3.0,1.4,0.2,setosa


In [31]:
# csv isn't self-describing so we need to specify schema if we want a specific one
sqlc.read.option('header', True).csv(csv_fp)

DataFrame[sepal_length: string, sepal_width: string, petal_length: string, petal_width: string, species: string]

In [32]:
schema = StructType([
    StructField("sepal_length", FloatType(), False),
    StructField("sepal_width", FloatType(), False),
    StructField("petal_length", FloatType(), False),
    StructField("petal_width", FloatType(), False),
    StructField("species", StringType(), False),
])
from_csv = sqlc.read.schema(schema).option('header', True).csv(csv_fp)
from_csv.printSchema()

root
 |-- sepal_length: float (nullable = true)
 |-- sepal_width: float (nullable = true)
 |-- petal_length: float (nullable = true)
 |-- petal_width: float (nullable = true)
 |-- species: string (nullable = true)



In [33]:
from_csv.write.csv('./test/to_csv/')

In [34]:
!ls ./test/to_csv/

part-00000-fa1ad407-cc7d-4661-b43d-00dd8b7e43b1-c000.csv  _SUCCESS


In [35]:
from_csv.rdd.getNumPartitions()

1

In [36]:
from_csv.repartition(4).write.csv('./test/to_csv2/')

In [37]:
!ls ./test/to_csv2/

part-00000-2263b38a-ea7e-4dba-8b1a-beebe7f274e4-c000.csv
part-00001-2263b38a-ea7e-4dba-8b1a-beebe7f274e4-c000.csv
part-00002-2263b38a-ea7e-4dba-8b1a-beebe7f274e4-c000.csv
part-00003-2263b38a-ea7e-4dba-8b1a-beebe7f274e4-c000.csv
_SUCCESS


In [38]:
from_csv.write.partitionBy("species").parquet('./test/parquet/')

In [39]:
!ls ./test/parquet/

species=setosa	species=versicolor  species=virginica  _SUCCESS
