# Notebook Corresponding to `pyspark` Notes

This notebook has the code and examples from the three sets of notes:
- `pyspark`: RDDs
- `pyspark`: pandas-on-Spark
- `pyspark`: Spark SQL

Each section should be able to be run without running the cells from the other sections.

## RDDs
As we are running a docker image that has spark associated with it, we can create a spark session using the following code:

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('my_app').getOrCreate()

In [2]:
type(spark)

pyspark.sql.session.SparkSession

We can explicitly create an RDD using the `sparkContext.paraellelize()` method on the `SparkSession` object.

In [218]:
#create some 'data' to put into an RDD
quick_cat = lambda x: "a" if x < 20 else "b"
my_data = [(quick_cat(x), x) for x in range(1,51)]
my_data[:3]
#spark session available through spark object
my_rdd = spark.sparkContext.parallelize(my_data)
my_rdd

ParallelCollectionRDD[590] at readRDDFromFile at PythonRDD.scala:274

This is an object stored (likely) over multiple partitions.

In [219]:
my_rdd.getNumPartitions()

12

We see that `my_rdd` doesn't actually print out the data when we look at the object. This is because there may be a ton of data and it doesn't want to show it to you by default. Instead we can perform an action like the `.take()` to actually have some data returned to us.

In [220]:
my_rdd.take(3)

[('a', 1), ('a', 2), ('a', 3)]

When we have tuple type object passed in as the data, the first value represents the `key` and the second the associated `values`.

In [221]:
my_rdd.keys().take(3)

['a', 'a', 'a']

In [222]:
my_rdd.values().take(3)

[1, 2, 3]

This allows us to do operations by key if we'd like! Note that `.count()` and `.countByKey()` are actions and so they return the value locally.

In [266]:
my_rdd.count()

50

In [224]:
my_rdd.countByKey()

defaultdict(int, {'a': 19, 'b': 31})

If instead we wanted to use the result of this counting operation as a new RDD, we could instead use something like the `mapValues()` method. This returns an RDD rather than a value and so we need to use `.collect()` to see the data.

In [225]:
my_rdd \
    .groupByKey() \
    .mapValues(len) \
    .collect()

[('b', 31), ('a', 19)]

With this, we could do some other transformation on the resulting object (say using `.map()`, which can apply a function to each element of our RDD). For instance, creating a log transformed value as well.

In [229]:
from numpy import log
my_rdd \
    .groupByKey() \
    .mapValues(len) \
    .map(lambda x: (x[0], x[1], log(x[1]))) \
    .collect()

[('b', 31, 3.4339872044851463), ('a', 19, 2.9444389791664403)]

RDD functions are hard to use though! We might want to find the total sum of the values for each key. We can use `.groupByKey()` and `mapValues()` for this but the documentation says it is better to use `aggregateByKey()`. But this function requires some confusing arguments.

In [230]:
my_rdd \
    .groupByKey() \
    .mapValues(sum) \
    .collect()

[('b', 1085), ('a', 190)]

In [232]:
my_rdd \
  .aggregateByKey(0, #initial value for each partition
                  lambda within_1, within_2: within_1 + within_2, #how to combine values on the same partition, next function is how to combine across partitions
                  lambda across_1, across_2: across_1 + across_2) \
  .collect()

[('b', 1085), ('a', 190)]

Let's do a bit more involved example to understand how things work. Remember that the computations are not done until you actually do an action (like `.take()`). 

Let's create a more involved `.values()` within our RDD. This can be done with `.map()` as we saw earlier.

In [348]:
from numpy import sqrt
my_rdd.map(lambda x: (x[0], (x[1], x[1]**2, sqrt(x[1])))).take(3)

[('a', (1, 1, 1.0)),
 ('a', (2, 4, 1.4142135623730951)),
 ('a', (3, 9, 1.7320508075688772))]

As we still have a tuple type object, the first element represents the `.keys()` and the second the `.values()`.

In [234]:
my_rdd.map(lambda x: (x[0], (x[1], x[1]**2, sqrt(x[1])))) \
    .keys() \
    .take(3)

['a', 'a', 'a']

In [235]:
my_rdd.map(lambda x: (x[0], (x[1], x[1]**2, sqrt(x[1])))) \
    .values() \
    .take(3)

[(1, 1, 1.0), (2, 4, 1.4142135623730951), (3, 9, 1.7320508075688772)]

We've seen the `.filter()` type functions before. These allow us to subset our data. In this case, let's one keep data where the original integer value is less than or equal to 35. Just to show the process I'm going to just keep adding to our computation rather than saving the result somewhere.

In [270]:
my_rdd.map(lambda x: (x[0], (x[1], x[1]**2, sqrt(x[1])))) \
    .filter(lambda x: x[1][0] <= 35) \
    .collect()

[('a', (1, 1, 1.0)),
 ('a', (2, 4, 1.4142135623730951)),
 ('a', (3, 9, 1.7320508075688772)),
 ('a', (4, 16, 2.0)),
 ('a', (5, 25, 2.23606797749979)),
 ('a', (6, 36, 2.449489742783178)),
 ('a', (7, 49, 2.6457513110645907)),
 ('a', (8, 64, 2.8284271247461903)),
 ('a', (9, 81, 3.0)),
 ('a', (10, 100, 3.1622776601683795)),
 ('a', (11, 121, 3.3166247903554)),
 ('a', (12, 144, 3.4641016151377544)),
 ('a', (13, 169, 3.605551275463989)),
 ('a', (14, 196, 3.7416573867739413)),
 ('a', (15, 225, 3.872983346207417)),
 ('a', (16, 256, 4.0)),
 ('a', (17, 289, 4.123105625617661)),
 ('a', (18, 324, 4.242640687119285)),
 ('a', (19, 361, 4.358898943540674)),
 ('b', (20, 400, 4.47213595499958)),
 ('b', (21, 441, 4.58257569495584)),
 ('b', (22, 484, 4.69041575982343)),
 ('b', (23, 529, 4.795831523312719)),
 ('b', (24, 576, 4.898979485566356)),
 ('b', (25, 625, 5.0)),
 ('b', (26, 676, 5.0990195135927845)),
 ('b', (27, 729, 5.196152422706632)),
 ('b', (28, 784, 5.291502622129181)),
 ('b', (29, 841, 5.385164

Now we could do other transformations on our values. For instance, summing the three numbers. This can be done with `.mapValues()`, which applies sum function to each of the **values** in the RDD (not to both the keys and values like `.map()` above did).

In [237]:
my_rdd.map(lambda x: (x[0], (x[1], x[1]**2, sqrt(x[1])))) \
    .filter(lambda x: x[1][0] <= 35) \
    .mapValues(sum) \
    .collect()

[('a', 3.0),
 ('a', 7.414213562373095),
 ('a', 13.732050807568877),
 ('a', 22.0),
 ('a', 32.236067977499786),
 ('a', 44.44948974278318),
 ('a', 58.64575131106459),
 ('a', 74.82842712474618),
 ('a', 93.0),
 ('a', 113.16227766016839),
 ('a', 135.3166247903554),
 ('a', 159.46410161513776),
 ('a', 185.605551275464),
 ('a', 213.74165738677394),
 ('a', 243.8729833462074),
 ('a', 276.0),
 ('a', 310.12310562561765),
 ('a', 346.24264068711926),
 ('a', 384.3588989435407),
 ('b', 424.4721359549996),
 ('b', 466.58257569495584),
 ('b', 510.69041575982345),
 ('b', 556.7958315233127),
 ('b', 604.8989794855663),
 ('b', 655.0),
 ('b', 707.0990195135928),
 ('b', 761.1961524227066),
 ('b', 817.2915026221292),
 ('b', 875.3851648071345),
 ('b', 935.4772255750516),
 ('b', 997.5677643628301),
 ('b', 1061.6568542494924),
 ('b', 1127.744562646538),
 ('b', 1195.8309518948454),
 ('b', 1265.9160797830996)]

We can use `.map()` instead if we wanted to.

In [361]:
#just to show the difference between .map() and .mapValues()
my_rdd.map(lambda x: (x[0], (x[1], x[1]**2, sqrt(x[1])))) \
    .filter(lambda x: x[1][0] <= 35) \
    .map(lambda x: (x[0], x[1][0]+x[1][1]+x[1][2])) \
    .collect()

[('a', 3.0),
 ('a', 7.414213562373095),
 ('a', 13.732050807568877),
 ('a', 22.0),
 ('a', 32.236067977499786),
 ('a', 44.44948974278318),
 ('a', 58.64575131106459),
 ('a', 74.82842712474618),
 ('a', 93.0),
 ('a', 113.16227766016839),
 ('a', 135.3166247903554),
 ('a', 159.46410161513776),
 ('a', 185.605551275464),
 ('a', 213.74165738677394),
 ('a', 243.8729833462074),
 ('a', 276.0),
 ('a', 310.12310562561765),
 ('a', 346.24264068711926),
 ('a', 384.3588989435407),
 ('b', 424.4721359549996),
 ('b', 466.58257569495584),
 ('b', 510.69041575982345),
 ('b', 556.7958315233127),
 ('b', 604.8989794855663),
 ('b', 655.0),
 ('b', 707.0990195135928),
 ('b', 761.1961524227066),
 ('b', 817.2915026221292),
 ('b', 875.3851648071345),
 ('b', 935.4772255750516),
 ('b', 997.5677643628301),
 ('b', 1061.6568542494924),
 ('b', 1127.744562646538),
 ('b', 1195.8309518948454),
 ('b', 1265.9160797830996)]

### RDD with Just Elements

Note that if we don't give it a tuple for the RDD then we don't have keys or values, we just have the data in there (`my_rdd2.values()` doesn't work).

In [362]:
my_rdd2 = spark.sparkContext.parallelize([i for i in range(0,20)])
my_rdd2.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

We can still apply functions to this type of RDD though.

In [364]:
my_rdd2.flatMap(lambda x: (x, x**2)).collect()

[0,
 0,
 1,
 1,
 2,
 4,
 3,
 9,
 4,
 16,
 5,
 25,
 6,
 36,
 7,
 49,
 8,
 64,
 9,
 81,
 10,
 100,
 11,
 121,
 12,
 144,
 13,
 169,
 14,
 196,
 15,
 225,
 16,
 256,
 17,
 289,
 18,
 324,
 19,
 361]

In [365]:
my_rdd2.flatMap(lambda x: (x, x**2)).mean()

66.5

### MapReduce Example Done Explicity Using RDDs

Recall that in the Hadoop section, we did a MapReduce algorithm to count the number of words in Oliver Twist.  We can redo that example using Spark!  It will actually be parallelized and all that automatically across our machine too!  

In [271]:
#create a spark session object (simplified, defaults to local)
spark = SparkSession.builder.getOrCreate()

Now let's read in our 53 chapters as a list.

In [273]:
my_chap = []
for i in range(1, 54):
    with open('dickens/chap' + str(i) + '.txt', 'r') as f:
        my_chap.append(f.read())

We want spark to handle this using RDDs explicitly. We can do that is via the `sparkContext.parallelize()` method.  This just tells spark to take our list and distribute it/prepare it for parallel computations.

Let's create some RDDs! We don't care about the chapters themselves, we just want the final word counts. This means we can start with an RDD without keys.

In [326]:
my_chap_rdd = spark.sparkContext.parallelize(my_chap)
type(my_chap_rdd)

pyspark.rdd.RDD

In [331]:
my_chap_rdd.take(1)

['chapter i  treats of the place where oliver twist was born and of the circumstances attending his birth  among other public buildings in a certain town which for many reasons it will be prudent to refrain from mentioning and to which i will assign no fictitious name there is one anciently common to most towns great or small to wit a workhouse and in this workhouse was born on a day and date which i need not trouble myself to repeat inasmuch as it can be of no possible consequence to the reader in this stage of the business at all events the item of mortality whose name is prefixed to the head of this chapter  for a long time after it was ushered into this world of sorrow and trouble by the parish surgeon it remained a matter of considerable doubt whether the child would survive to bear any name at all in which case it is somewhat more than probable that these memoirs would never have appeared or if they had that being comprised within a couple of pages they would have possessed the i

Great, now we want to take these different values (each chapter is a value) and split those strings by spaces. 

In [333]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .take(10)

['chapter',
 'i',
 '',
 'treats',
 'of',
 'the',
 'place',
 'where',
 'oliver',
 'twist']

Now we have an RDD whose elements are each word (again no keys). First let's filter to remove any empty spaces.

In [454]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .take(10)

['chapter',
 'i',
 'treats',
 'of',
 'the',
 'place',
 'where',
 'oliver',
 'twist',
 'was']

Let's do a transform on this where we make each word a key and assign it a value of 1.

In [456]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .map(lambda word: (word, 1)) \
    .take(5)

[('chapter', 1), ('i', 1), ('treats', 1), ('of', 1), ('the', 1)]

Nice! Now we just need to reduce this by key and we are done!

In [457]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .take(10)

[('of', 3686),
 ('no', 569),
 ('there', 478),
 ('common', 27),
 ('stage', 5),
 ('business', 45),
 ('item', 2),
 ('head', 215),
 ('have', 736),
 ('extant', 1)]

Let's sort this by the keys. We can sort descending by simply sorting on the negative of the values.

In [458]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: -x[1]) \
    .take(10)

[('the', 9272),
 ('and', 5138),
 ('to', 3710),
 ('of', 3686),
 ('a', 3603),
 ('in', 2286),
 ('he', 2279),
 ('his', 2247),
 ('that', 1761),
 ('was', 1729)]

Let's collect all of the data and turn it into a regular pandas data frame.

In [460]:
results = my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: -x[1]) \
    .collect()
import pandas as pd
df_results = pd.DataFrame(results, columns = ["word", "count"])
df_results

Unnamed: 0,word,count
0,the,9272
1,and,5138
2,to,3710
3,of,3686
4,a,3603
...,...,...
11067,associated,1
11068,illfeigned,1
11069,nay,1
11070,extenuation,1


## pandas-on-Spark

Below is the code from the notes on pandas-on-Spark.

First let's import our modules.

In [367]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps



Create a pandas-on-Spark series via `ps.Series()`

In [373]:
ps.Series([1, 3, 5, np.nan, 6, 8]) #ignore the warning

  fields = [
  for column, series in pdf.iteritems():


0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

Create a pandas-on-Spark DataFrame via `ps.DataFrame()`

In [374]:
ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

  fields = [
  for column, series in pdf.iteritems():


Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


Convert from a pandas DataFrame to a pandas-on-spark easily.

In [382]:
pdf = pd.read_csv("https://www4.stat.ncsu.edu/~online/datasets/red-wine.csv", delimiter = ";")
psdf = ps.from_pandas(pdf)
psdf.head()

  fields = [
  for column, series in pdf.iteritems():


Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5


Can subset the data using things we know like the `.loc[]` method.

In [385]:
psdf.loc[psdf.quality > 5, ["alcohol", "quality"]].head()

Unnamed: 0,alcohol,quality
3,9.8,6
7,10.0,7
8,9.5,7
16,10.5,7
19,9.2,6


Can also read data directly into a pandas-on-spark data frame using the `ps.read_csv()` function (can't read from a URL though).

In [386]:
titanic_ps = ps.read_csv("titanic.csv") #can't call from URL
titanic_ps["survived"].value_counts()



0    809
1    500
Name: survived, dtype: int64

Can now do our usual summarizations using the `.groupby()` method along with a summarization method.

In [388]:
titanic_ps.groupby("survived").mean()

Unnamed: 0_level_0,pclass,age,sibsp,parch,fare,body
survived,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,1.962,28.918228,0.462,0.476,49.361184,
0,2.500618,30.545369,0.521632,0.328801,23.353831,160.809917


In [389]:
titanic_ps.describe()

Unnamed: 0,pclass,survived,age,sibsp,parch,fare,body
count,1309.0,1309.0,1046.0,1309.0,1309.0,1308.0,121.0
mean,2.294882,0.381971,29.881135,0.498854,0.385027,33.295479,160.809917
std,0.837836,0.486055,14.4135,1.041658,0.86556,51.758668,97.696922
min,1.0,0.0,0.1667,0.0,0.0,0.0,1.0
25%,2.0,0.0,21.0,0.0,0.0,7.8958,72.0
50%,3.0,0.0,28.0,0.0,0.0,14.4542,155.0
75%,3.0,1.0,39.0,1.0,0.0,31.275,256.0
max,3.0,1.0,80.0,8.0,9.0,512.3292,328.0


We can also use the `.transform()` and `.apply()` methods (also used in regular pandas) to perform other common operations.

First we can transform the values in our columns (say center and scale them).

In [390]:
def standardize(pser):
     return (pser + pser.mean())/pser.std()  # should always return the same length as input.

In [399]:
titanic_ps[["age", "fare"]] \
    .rename(columns = {"age": "o_age", "fare": "o_fare"}) \
    .join(titanic_ps[["age", "fare"]]
              .transform(standardize)) \
    .head()

  fields = [
  for column, series in pdf.iteritems():


Unnamed: 0,o_age,o_fare,age,fare
0,29.0,211.3375,4.085138,4.726416
1,0.9167,151.55,2.136735,3.571295
2,2.0,151.55,2.211894,3.571295
3,30.0,151.55,4.154517,3.571295
4,25.0,151.55,3.80762,3.571295


Can use `.apply()` to possible return something shorter than the original.


In [400]:
def standardize_positives(pser):
     return (pser[pser>30] + pser[pser>30].mean())/pser[pser>30].std()  
# can return something short than input length

In [None]:
titanic_ps[["age"]] \
    .rename(columns = {"age": "o_age"}) \
    .join(titanic_ps[["age"]].apply(standardize_positives)) \
    .head()

In [406]:
titanic_ps[["age"]] \
    .apply(standardize_positives) \
    .head()

  fields = [
  for column, series in pdf.iteritems():


Unnamed: 0,age
5,9.135889
6,10.636052
7,8.235791
8,9.635943
9,11.436139


### MapReduce Example Done via pandas-on-Spark

Let's repeat our map reduce example but, you know, do it more easily :)

Recall the `my_chap` object.

In [409]:
#read in each chapter to a list element
my_chap = []
for i in range(1, 54):
    with open('dickens/chap' + str(i) + '.txt', 'r') as f:
        my_chap.append(f.read())
        
my_chap[0]

'chapter i  treats of the place where oliver twist was born and of the circumstances attending his birth  among other public buildings in a certain town which for many reasons it will be prudent to refrain from mentioning and to which i will assign no fictitious name there is one anciently common to most towns great or small to wit a workhouse and in this workhouse was born on a day and date which i need not trouble myself to repeat inasmuch as it can be of no possible consequence to the reader in this stage of the business at all events the item of mortality whose name is prefixed to the head of this chapter  for a long time after it was ushered into this world of sorrow and trouble by the parish surgeon it remained a matter of considerable doubt whether the child would survive to bear any name at all in which case it is somewhat more than probable that these memoirs would never have appeared or if they had that being comprised within a couple of pages they would have possessed the in

Let's put this into a pandas-on-Spark series and manipulate from there!

In [439]:
#combine the list elements into one large string
from functools import reduce
big_string = reduce(lambda x, y: x + y, my_chap)

#create a series with the big string
chap_pss = ps.Series(big_string)
chap_pss

  fields = [
  for column, series in pdf.iteritems():


0    chapter i  treats of the place where oliver tw...
dtype: object

Now use the `.str.split()` method on a pandas-on-Spark series to create a series with a list of words.

In [442]:
chap_pss.str.split(" ")

0    [chapter, i, , treats, of, the, place, where, ...
dtype: object

What we want is to have each word be in a column with the count associated as another column. Let's convert the list stored in the series to a data frame. Then remove the empty spaces.

In [461]:
word_df = ps.DataFrame(chap_pss.str.split(" ")[0], columns = ["word"])
word_df.head()

  fields = [
  for column, series in pdf.iteritems():


Unnamed: 0,word
0,chapter
1,i
2,
3,treats
4,of


In [467]:
word_df = word_df.loc[word_df.word != ""]
word_df.head()

Unnamed: 0,word
0,chapter
1,i
3,treats
4,of
5,the


At first, we can just assign each value to a 1 and then use our usual `.groupby()` to get our desired result. Add a count of 1 for each word.

In [468]:
word_df["count"] = 1
word_df.head()

Unnamed: 0,word,count
0,chapter,1
1,i,1
3,treats,1
4,of,1
5,the,1


Awesome! Now just group by the word and sum it up!

In [469]:
word_df \
    .groupby("word") \
    .sum() \
    .head()

Unnamed: 0_level_0,count
word,Unnamed: 1_level_1
some,366
few,102
hope,64
destitute,4
still,87


Sort it so we can compare to our previous work.

In [472]:
word_df \
    .groupby("word") \
    .sum() \
    .sort_values(by = "count", ascending = False) \
    .head()

Unnamed: 0_level_0,count
word,Unnamed: 1_level_1
the,9272
and,5138
to,3710
of,3686
a,3603


Woot!

## Spark SQL

Below is the code from the Spark SQL notes.

Start with creation a spark session.

In [482]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('my_app').getOrCreate()

Now let's look at a few ways to create a Spark SQL Data Frame.

In [484]:
from pyspark.sql import Row
from datetime import datetime, date
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [485]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [486]:
df = spark.read.load("neuralgia.csv",
                     format="csv", 
                     sep=",", 
                     inferSchema="true", 
                     header="true")
df

DataFrame[Treatment: string, Sex: string, Age: int, Duration: int, Pain: string]

In [487]:
import pandas as pd
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

  for column, series in pdf.iteritems():


DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Conveniently, we can go back and forth between Spark SQL style Data Frames and pandas-on-Spark style Data Frames.

In [489]:
sdf = spark.read.load("neuralgia.csv",
                     format="csv", 
                     sep=",", 
                     inferSchema="true", 
                     header="true")
type(sdf)

pyspark.sql.dataframe.DataFrame

In [491]:
dfps = sdf.to_pandas_on_spark()
type(dfps)

pyspark.pandas.frame.DataFrame

In [493]:
sdf2 = dfps.to_spark()
type(sdf2)

pyspark.sql.dataframe.DataFrame

Schema and column names are important to know.

In [497]:
df = spark.read.load("neuralgia.csv",
                     format="csv", 
                     sep=",", 
                     inferSchema="true", 
                     header="true")
df.printSchema()
df.columns

root
 |-- Treatment: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Pain: string (nullable = true)



['Treatment', 'Sex', 'Age', 'Duration', 'Pain']

We can return the data using `.take()`, `.show()`. and `.collect()`.

In [498]:
df.show(3)

+---------+---+---+--------+----+
|Treatment|Sex|Age|Duration|Pain|
+---------+---+---+--------+----+
|        P|  F| 68|       1|  No|
|        B|  M| 74|      16|  No|
|        P|  F| 67|      30|  No|
+---------+---+---+--------+----+
only showing top 3 rows



In [499]:
df.take(3)

[Row(Treatment='P', Sex='F', Age=68, Duration=1, Pain='No'),
 Row(Treatment='B', Sex='M', Age=74, Duration=16, Pain='No'),
 Row(Treatment='P', Sex='F', Age=67, Duration=30, Pain='No')]

Next, we'll look at common transformations. Starting with column operations.

In [500]:
#select columns you want
df.select("Age")

DataFrame[Age: int]

In [501]:
df.Age

Column<'Age'>

In [503]:
df.select("Age", "Pain").show(3)

+---+----+
|Age|Pain|
+---+----+
| 68|  No|
| 74|  No|
| 67|  No|
+---+----+
only showing top 3 rows



`.withColumn()` can be used to create new columns.

In [510]:
df.withColumn("Current_Age", df.Age + 2).show(3)

+---------+---+---+--------+----+-----------+
|Treatment|Sex|Age|Duration|Pain|Current_Age|
+---------+---+---+--------+----+-----------+
|        P|  F| 68|       1|  No|         70|
|        B|  M| 74|      16|  No|         76|
|        P|  F| 67|      30|  No|         69|
+---------+---+---+--------+----+-----------+
only showing top 3 rows



Can also rename columns.

In [516]:
from pyspark.sql.functions import col
df \
  .withColumnRenamed('Age', 'Former_Age') \
  .withColumn("Current_Age", col("Former_Age") + 2) \
  .show(3)

+---------+---+----------+--------+----+-----------+
|Treatment|Sex|Former_Age|Duration|Pain|Current_Age|
+---------+---+----------+--------+----+-----------+
|        P|  F|        68|       1|  No|         70|
|        B|  M|        74|      16|  No|         76|
|        P|  F|        67|      30|  No|         69|
+---------+---+----------+--------+----+-----------+
only showing top 3 rows



We can using conditional logic with `when()` from `pyspark.sql.functions`.

In [518]:
from pyspark.sql.functions import *
df.withColumn("Age_cat", 
               when(df.Age>75, "75+")
              .when(df.Age>=70, "70-75")
              .otherwise("<70")) \
    .show(3)

+---------+---+---+--------+----+-------+
|Treatment|Sex|Age|Duration|Pain|Age_cat|
+---------+---+---+--------+----+-------+
|        P|  F| 68|       1|  No|    <70|
|        B|  M| 74|      16|  No|  70-75|
|        P|  F| 67|      30|  No|    <70|
+---------+---+---+--------+----+-------+
only showing top 3 rows



In [521]:
df.withColumn("Age_cat", 
               when(df.Age>75, "75+")
              .when(df.Age>=70, "70-75")
              .otherwise("<70")) \
   .withColumn("ln_Duration", log(df.Duration)) \
   .show(3)

+---------+---+---+--------+----+-------+------------------+
|Treatment|Sex|Age|Duration|Pain|Age_cat|       ln_Duration|
+---------+---+---+--------+----+-------+------------------+
|        P|  F| 68|       1|  No|    <70|               0.0|
|        B|  M| 74|      16|  No|  70-75| 2.772588722239781|
|        P|  F| 67|      30|  No|    <70|3.4011973816621555|
+---------+---+---+--------+----+-------+------------------+
only showing top 3 rows



We can also create our own functions with `udf`.

In [523]:
code_trt = udf(lambda x: "P Trt" if x == "P" else "Other")
df.withColumn('my_trt', code_trt('Treatment')).show(3)

+---------+---+---+--------+----+------+
|Treatment|Sex|Age|Duration|Pain|my_trt|
+---------+---+---+--------+----+------+
|        P|  F| 68|       1|  No| P Trt|
|        B|  M| 74|      16|  No| Other|
|        P|  F| 67|      30|  No| P Trt|
+---------+---+---+--------+----+------+
only showing top 3 rows



We can do the common operations on rows as well.

In [526]:
df.sort(df.Duration).show(3)

+---------+---+---+--------+----+
|Treatment|Sex|Age|Duration|Pain|
+---------+---+---+--------+----+
|        A|  M| 69|       1|  No|
|        B|  M| 70|       1|  No|
|        B|  F| 78|       1|  No|
+---------+---+---+--------+----+
only showing top 3 rows



In [528]:
df.sort(df.Duration, ascending = False).show(3)

+---------+---+---+--------+----+
|Treatment|Sex|Age|Duration|Pain|
+---------+---+---+--------+----+
|        B|  F| 72|      50|  No|
|        A|  M| 62|      42|  No|
|        B|  F| 69|      42|  No|
+---------+---+---+--------+----+
only showing top 3 rows



In [529]:
df.filter(df.Age < 65).show(3)

+---------+---+---+--------+----+
|Treatment|Sex|Age|Duration|Pain|
+---------+---+---+--------+----+
|        A|  F| 63|      27|  No|
|        A|  M| 62|      42|  No|
|        P|  F| 64|       1| Yes|
+---------+---+---+--------+----+
only showing top 3 rows



We can do basic summaries including grouped summaries!

In [530]:
df.select("Age", "Pain").describe().show()

+-------+-----------------+----+
|summary|              Age|Pain|
+-------+-----------------+----+
|  count|               60|  60|
|   mean|            70.05|null|
| stddev|5.189379637003748|null|
|    min|               59|  No|
|    max|               83| Yes|
+-------+-----------------+----+



In [543]:
df \
    .select(["Duration", "Age", "Treatment"]) \
    .agg(sum("Duration"), avg("Age"), count("Treatment")) \
    .show()

+-------------+--------+----------------+
|sum(Duration)|avg(Age)|count(Treatment)|
+-------------+--------+----------------+
|         1004|   70.05|              60|
+-------------+--------+----------------+



In [548]:
df.select(["Duration", "Age", "Treatment"]) \
    .groupBy("Treatment") \
    .sum() \
    .withColumnRenamed("sum(Duration)", "sum_Duration") \
    .withColumnRenamed("sum(Age)", "sum_Age") \
    .show()

+---------+------------+-------+
|Treatment|sum_Duration|sum_Age|
+---------+------------+-------+
|        B|         386|   1417|
|        A|         327|   1385|
|        P|         291|   1401|
+---------+------------+-------+



In [549]:
df.createTempView("df")
spark.sql("SELECT sex, age FROM df LIMIT 4").show()

+---+---+
|sex|age|
+---+---+
|  F| 68|
|  M| 74|
|  F| 67|
|  M| 66|
+---+---+



### MapReduce Example Done via Spark SQL 

Let's redo it with Spark SQL!

In [571]:
#read in each chapter to a list element
my_chap = []
for i in range(1, 54):
    with open('dickens/chap' + str(i) + '.txt', 'r') as f:
        my_chap.append(f.read())

from pyspark.sql.types import StringType
sql_text = spark.createDataFrame(my_chap, StringType())
sql_text

DataFrame[value: string]

In [573]:
sql_text.take(1)

[Row(value='chapter i  treats of the place where oliver twist was born and of the circumstances attending his birth  among other public buildings in a certain town which for many reasons it will be prudent to refrain from mentioning and to which i will assign no fictitious name there is one anciently common to most towns great or small to wit a workhouse and in this workhouse was born on a day and date which i need not trouble myself to repeat inasmuch as it can be of no possible consequence to the reader in this stage of the business at all events the item of mortality whose name is prefixed to the head of this chapter  for a long time after it was ushered into this world of sorrow and trouble by the parish surgeon it remained a matter of considerable doubt whether the child would survive to bear any name at all in which case it is somewhat more than probable that these memoirs would never have appeared or if they had that being comprised within a couple of pages they would have posse

Ok, first we need to split the words out within each *row*. When we read in all the SQL functions there was a `split()` function that will work for us!

Note the way we use the function without `.withColumn()` by using `.select()`. This is a common way to use these functions without adding to the original data frame.

In [583]:
sql_text.select(split(sql_text.value, " ").alias("words")).show(4)

+--------------------+
|               words|
+--------------------+
|[chapter, i, , tr...|
|[chapter, ii, , t...|
|[chapter, iii, , ...|
|[chapter, iv, , o...|
+--------------------+
only showing top 4 rows



Ok, now we have a data frame with one column where each entry is a list of the words! This is closer. 

What we need to do is now **explode** out these lists. We read in a function called **explode** that will split these values up and create new rows for each entry!

Notice how we call the function inside select again.

In [585]:
sql_text.select(explode(split(sql_text.value, " ")).alias("word")).show(4)

+-------+
|   word|
+-------+
|chapter|
|      i|
|       |
| treats|
+-------+
only showing top 4 rows



Woo, almost there. Now we can filter out the blank spaces.

In [593]:
my_words = sql_text.select(explode(split(sql_text.value, " ")).alias("word"))
my_words \
    .filter(my_words.word != "") \
    .show(4)

+-------+
|   word|
+-------+
|chapter|
|      i|
| treats|
|     of|
+-------+
only showing top 4 rows



Finally we group and count!

In [600]:
my_words \
    .filter(my_words.word != "") \
    .groupBy("word") \
    .count() \
    .show(5)

+---------+-----+
|     word|count|
+---------+-----+
|     some|  366|
|      few|  102|
|     hope|   64|
|destitute|    4|
|    still|   87|
+---------+-----+
only showing top 5 rows



Arrange it!

In [601]:
counts = my_words \
                .filter(my_words.word != "") \
                .groupBy("word") \
                .count()
counts.sort(counts["count"], ascending = False).show(10)

+----+-----+
|word|count|
+----+-----+
| the| 9272|
| and| 5138|
|  to| 3710|
|  of| 3686|
|   a| 3603|
|  in| 2286|
|  he| 2279|
| his| 2247|
|that| 1761|
| was| 1729|
+----+-----+
only showing top 10 rows

