### This is my first (public) notebook of playing with pyspark

The goal of this notebook is to demonstrare a simple use case of:
1. using pyspark (python wrapper for spark) and access existing spark cluster
2. loading data into spark
3. accessing the data using the rdd-api, dataframe-api, and dataset-api
4. simple data exploration

I'll be using a sample of the gutenberg project - https://web.eecs.umich.edu/~lahiri/gutenberg_dataset.html

#### The settings I'm using here:
- spark: version 2.0.0
- python: 3.5

In [7]:
import pyspark 
from pyspark.mllib.random import RandomRDDs
from pprint import pprint
import matplotlib.pyplot as plt
import numpy as np
import random
from collections import Counter #used to count the occorences of each item in numpy list

%matplotlib notebook

In [3]:
# link to existing spark cluster
sc.stop()
sc = pyspark.SparkContext(appName="spark-project")

In [8]:
# Playing with matplotlib - visualization library of spark
# This library does not plot rdd directly - rather bring the data to the local env
# First let's create a sample from the normal distribution - saved as rdd
#students_grades = sc.parallelize(np.random.normal(loc=50,scale=10,size=50000))
students_grades = RandomRDDs.normalRDD(sc, 10000).map(lambda x: 10.0 + 50.0 * x)

print(students_grades.stats())
# we sample several times, each time with more samples
fig1 = plt.figure()
cell1_fig = fig1.add_subplot(111)
cell1_fig.hist(students_grades.sample(False, 0.4).collect(), bins=50, facecolor='g', alpha=0.25)
cell1_fig.hist(students_grades.sample(False, 0.2).collect(), bins=50, facecolor='c', alpha=0.45)
cell1_fig.hist(students_grades.sample(False, 0.1).collect(), bins=50, facecolor='r', alpha=0.65)
cell1_fig.grid(True)
fig1.show()

(count: 10000, mean: 9.957705440678026, stdev: 49.96897915794375, max: 202.15845823412346, min: -168.6086704022496)


<IPython.core.display.Javascript object>

In [17]:
# Now let's add the student experience in years before attending the university
students_experience = sc.parallelize(np.random.choice(a=[1,2,3,4,5,6], size=10000, replace=True))
students_experience.count()
students_rdd = students_grades.zip(students_experience).map(lambda a:[a[0],a[1]])
#fig2 = plt.figure()
#cell2_fig = fig2.add_subplot(111)
#cell2_fig.hist(students_rdd.sample(False, 0.1).values().collect(), bins=50, facecolor='g', alpha=0.25)
#cell2_fig.plot(students_rdd.sample(False, 500).keys().collect(), students_rdd.sample(False, 500).values().collect() )
#cell2_fig.boxplot(students_rdd.sample(False, 200).collect())
#fig2.show()

[125.4296361087698, 1, 53.30749502791638]

In [None]:
# Here we demonstrare a usage of the RDD API to read a text file
rdd = sc.textFile("s3n://ak-public-sandbox/datasets/gutenberg_dataset/1/8/*/*/*")
rdd.count()
rdd.take(1) 

In [None]:
# Let's see some fitering capabilities
rdd.filter(lambda line: "French" in line).count()

In [None]:
# The good-old word count example
term_freq = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b: a + b)
print("Total number of unique words %s" % term_freq.count())
stop_words = term_freq.takeOrdered(5,lambda a:-a[1])
rare_words = term_freq.takeOrdered(5,lambda a:a[1])
print("5 most frequent words: (word, freq): {0}".format(stop_words))
print("5 least frequent words: (word, freq) {0}".format(rare_words))

In [None]:
# We count the number of occurences of each frequect (by value)
# For example, the term 'the' is the only which occured 37335, so we denote (37335,1).
count_per_freq = term_freq.map(lambda tf: (tf[1],1)).reduceByKey(lambda a,b: a+b)
# We use very simple bucketing, this is needed due to the collect call in the next cell - to avoid crashing the master node due to OOM
bucket_size = 50
binned_counter = count_per_freq.map(lambda freq: (freq[0] / bucket_size,freq[1])).reduceByKey(lambda a,b: a+b)

In [None]:
# let's try to plot the (binned) frequency data, using log
num_bins = 100
n, bins, patches = plt.hist(binned_counter.values().collect(), num_bins, facecolor='b', normed=1,alpha=0.9)
plt.show()

In [None]:
#ability to save the rdd directly as text file in s3
#rdd.saveAsTextFile("s3n://full/path/here")

In [None]:
# basic usage of boto3
#import boto3
#session = boto3.Session() # leave out the profile_name argument if you haven't defined profiles
#s3 = boto3.resource('s3')
#bucket = s3.Bucket(bucket_name)
#objs = bucket.objects.filter(Prefix=some_prefix)
#for obj in objs:  
#  print(obj.key)
#obj = s3.Object(bucket_name, file_full_path_in_bucket)
#content = obj.get()["Body"].read().decode("utf-8")
#print(content[:300])