# Spark: Getting Started
 * These instructions require a Mac with [Anaconda3](https://anaconda.com/) and [Homebrew](https://brew.sh/) installed.
 * Useful for small data only. For larger data, try [Databricks](https://databricks.com/).

## Step 0: Prerequisites & Installation

Run these commands in your terminal (just once).

```bash
# Make Homebrew aware of old versions of casks
brew tap caskroom/versions

# Install Java 1.8 (OpenJDK 8)
brew cask install adoptopenjdk8

# Install the current version of Spark
brew install apache-spark

# Install wget
brew install wget

# Install Py4J (connects PySpark to the Java Virtual Machine)
pip install py4j

# Add JAVA_HOME to .bash_profile (makes Java 1.8 your default JVM)
echo "export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)" >> ~/.bash_profile

# Add SPARK_HOME to .bash_profile
export SPARK_HOME=/usr/local/Cellar/apache-spark/2.4.3/libexec
echo "export SPARK_HOME=/usr/local/Cellar/apache-spark/2.4.3/libexec" >> ~/.bash_profile

# Add PySpark to PYTHONPATH
echp "export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH" >> ~/.ba

# Update current environment
source ~/.bash_profile

```

## Step 1: Create a SparkSession with a SparkContext

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
spark

In [3]:
sc

## Step 2: Download some Amazon reviews (Toys & Games)

DONE
Download data (run this only once)
!wget http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Toys_and_Games_5.json.gz
!gunzip reviews_Toys_and_Games_5.json.gz

## Step 3: Create a Spark DataFrame

In [2]:
#Notice this is a local file. There are computer clusters don't have local files so be careful.
jf = spark.read.json('data/reviews_Toys_and_Games_5.json')

In [3]:
#look at schema
jf

DataFrame[asin: string, helpful: array<bigint>, overall: double, reviewText: string, reviewTime: string, reviewerID: string, reviewerName: string, summary: string, unixReviewTime: bigint]

In [4]:
jf.limit(5).show()

+----------+-------+-------+--------------------+-----------+--------------+--------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|  reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+--------------+--------------------+--------------+
|0439893577| [0, 0]|    5.0|I like the item p...|01 29, 2014|A1VXOAVRGKGEAK|         Angie|      Magnetic board|    1390953600|
|0439893577| [1, 1]|    4.0|Love the magnet e...|03 28, 2014| A8R62G708TSCM|       Candace|it works pretty g...|    1395964800|
|0439893577| [1, 1]|    5.0|Both sides are ma...|01 28, 2013|A21KH420DK0ICA|capemaychristy|          love this!|    1359331200|
|0439893577| [0, 0]|    5.0|Bought one a few ...| 02 8, 2014| AR29QK6HPFYZ4|          dcrm|   Daughters love it|    1391817600|
|0439893577| [1, 1]|    4.0|I have a stainles...| 05 5, 2014| ACCH8EOML6FN5|          DoyZ|Great to have

In [5]:
jf.limit(5).toPandas()

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,439893577,"[0, 0]",5.0,I like the item pricing. My granddaughter want...,"01 29, 2014",A1VXOAVRGKGEAK,Angie,Magnetic board,1390953600
1,439893577,"[1, 1]",4.0,Love the magnet easel... great for moving to d...,"03 28, 2014",A8R62G708TSCM,Candace,it works pretty good for moving to different a...,1395964800
2,439893577,"[1, 1]",5.0,Both sides are magnetic. A real plus when you...,"01 28, 2013",A21KH420DK0ICA,capemaychristy,love this!,1359331200
3,439893577,"[0, 0]",5.0,Bought one a few years ago for my daughter and...,"02 8, 2014",AR29QK6HPFYZ4,dcrm,Daughters love it,1391817600
4,439893577,"[1, 1]",4.0,I have a stainless steel refrigerator therefor...,"05 5, 2014",ACCH8EOML6FN5,DoyZ,Great to have so he can play with his alphabet...,1399248000


In [6]:
#Instead of loading this file in over again every time we want to look at it, we'll use
jf.persist()

DataFrame[asin: string, helpful: array<bigint>, overall: double, reviewText: string, reviewTime: string, reviewerID: string, reviewerName: string, summary: string, unixReviewTime: bigint]

In [7]:
jf.head()

Row(asin='0439893577', helpful=[0, 0], overall=5.0, reviewText='I like the item pricing. My granddaughter wanted to mark on it but I wanted it just for the letters.', reviewTime='01 29, 2014', reviewerID='A1VXOAVRGKGEAK', reviewerName='Angie', summary='Magnetic board', unixReviewTime=1390953600)

In [8]:
df = jf.toPandas()

In [9]:
df.head()

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,439893577,"[0, 0]",5.0,I like the item pricing. My granddaughter want...,"01 29, 2014",A1VXOAVRGKGEAK,Angie,Magnetic board,1390953600
1,439893577,"[1, 1]",4.0,Love the magnet easel... great for moving to d...,"03 28, 2014",A8R62G708TSCM,Candace,it works pretty good for moving to different a...,1395964800
2,439893577,"[1, 1]",5.0,Both sides are magnetic. A real plus when you...,"01 28, 2013",A21KH420DK0ICA,capemaychristy,love this!,1359331200
3,439893577,"[0, 0]",5.0,Bought one a few years ago for my daughter and...,"02 8, 2014",AR29QK6HPFYZ4,dcrm,Daughters love it,1391817600
4,439893577,"[1, 1]",4.0,I have a stainless steel refrigerator therefor...,"05 5, 2014",ACCH8EOML6FN5,DoyZ,Great to have so he can play with his alphabet...,1399248000


We'll look at product id `asin` and `overall`

In [16]:
reviews_jf = jf[['asin', 'overall']]

In [17]:
reviews_jf.head()

Row(asin='0439893577', overall=5.0)

In [18]:
reviews_jf.limit(3)

DataFrame[asin: string, overall: double]

In [20]:
def show(jf, n=5):
    '''load portion of json dataframe into a pandas dataframe'''
    return jf.limit(n).toPandas()

In [21]:
show(reviews_jf)

Unnamed: 0,asin,overall
0,439893577,5.0
1,439893577,4.0
2,439893577,5.0
3,439893577,5.0
4,439893577,4.0


In [24]:
sorted_review_jf = reviews_jf.sort('overall')

In [26]:
show(sorted_review_jf)

Unnamed: 0,asin,overall
0,B000ZLZ1NU,1.0
1,B0010SGZEG,1.0
2,B0010AYFPU,1.0
3,B000Z9FT0M,1.0
4,B0010EJGSC,1.0


In [35]:
import pyspark.sql.functions as F

Let's use sql language to return what we want. This has the advantage over pandas in that you can access data of unlimited size if it's within our cluster, while pandas needs to access it all locally. Let's find out the values_count of sorted_review.

In [36]:
query = '''
SELECT overall, COUNT (*)
FROM reviews
GROUP BY overall'''

In [37]:
reviews_jf.createOrReplaceTempView('reviews')

In [38]:
output = spark.sql(query)

In [39]:
show(output)

Unnamed: 0,overall,count(1)
0,1.0,4707
1,4.0,37445
2,3.0,16357
3,2.0,6298
4,5.0,102790


In [40]:
reviews_df.rdd

MapPartitionsRDD[59] at javaToPython at NativeMethodAccessorImpl.java:0

In [41]:
jf.rdd

MapPartitionsRDD[61] at javaToPython at NativeMethodAccessorImpl.java:0

Recall that Spark objects are immutable, while pandas are not.

In [42]:
row_one = jf.first()

In [43]:
row_one

Row(asin='0439893577', helpful=[0, 0], overall=5.0, reviewText='I like the item pricing. My granddaughter wanted to mark on it but I wanted it just for the letters.', reviewTime='01 29, 2014', reviewerID='A1VXOAVRGKGEAK', reviewerName='Angie', summary='Magnetic board', unixReviewTime=1390953600)

In [44]:
row_one['reviewText']

'I like the item pricing. My granddaughter wanted to mark on it but I wanted it just for the letters.'

How many words in `row_one` `reviewText`?

In [45]:
def word_count(text):
    return len(text.split())

In [46]:
word_count(row_one['reviewText'])

20

Let's try to count the words in all the reviews

In [47]:
review_text_col = jf['reviewText']

In [48]:
jf.withColumn?

[0;31mSignature:[0m [0mjf[0m[0;34m.[0m[0mwithColumn[0m[0;34m([0m[0mcolName[0m[0;34m,[0m [0mcol[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Returns a new :class:`DataFrame` by adding a column or replacing the
existing column that has the same name.

The column expression must be an expression over this DataFrame; attempting to add
a column from some other dataframe will raise an error.

:param colName: string, name of the new column.
:param col: a :class:`Column` expression for the new column.

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]

.. versionadded:: 1.3
[0;31mFile:[0m      /anaconda3/envs/learn-env/lib/python3.6/site-packages/pyspark/sql/dataframe.py
[0;31mType:[0m      method


In [33]:
F.udf?

[0;31mSignature:[0m [0mF[0m[0;34m.[0m[0mudf[0m[0;34m([0m[0mf[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mreturnType[0m[0;34m=[0m[0mStringType[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Creates a user defined function (UDF).

.. note:: The user-defined functions are considered deterministic by default. Due to
    optimization, duplicate invocations may be eliminated or the function may even be invoked
    more times than it is present in the query. If your function is not deterministic, call
    `asNondeterministic` on the user defined function. E.g.:

>>> from pyspark.sql.types import IntegerType
>>> import random
>>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()

.. note:: The user-defined functions do not support conditional expressions or short circuiting
    in boolean expressions and it ends up with being executed all internally. If the functions
    can fail on special rows, the workaround is to incorporate th

In [59]:
from pyspark.sql.types import IntegerType
word_count_ujf = F.udf(word_count, IntegerType())

In [60]:
counts_jf = jf.withColumn('wordcount', word_count_udf(review_text_col))

In [61]:
counts_jf.head()

Row(asin='0439893577', helpful=[0, 0], overall=5.0, reviewText='I like the item pricing. My granddaughter wanted to mark on it but I wanted it just for the letters.', reviewTime='01 29, 2014', reviewerID='A1VXOAVRGKGEAK', reviewerName='Angie', summary='Magnetic board', unixReviewTime=1390953600, wordcount=20)

### Let's use sql language!

In [52]:
jf.createOrReplaceTempView('reviews')
spark.udf.register('word_count', word_count)


<function __main__.word_count(text)>

In [62]:
query = '''SELECT asin
, overall
, reviewText
, word_count(reviewText)
FROM reviews'''


In [63]:
counts_jf = spark.sql(query)

### Steps:
1. create python function <br>
def show(df, n=5): <br>
    '''load portion of json dataframe into a pandas dataframe''' <br>
    return df.limit(n).toPandas() <br>
2. wrap it with something <br>
import pyspark.sql.functions as F <br>
from pyspark.sql.types import IntegerType <br>
word_count_udf = F.udf(word_count, IntegerType()) <br>
3. Create name for the dataframe <br>
df.createOrReplaceTempView('reviews') <br>
4. Register that function <br>
spark.udf.register('word_count', word_count) <br>

In [55]:
def count_all_the_things(text):
    return [len(text.split), len(tect.split())]
    

In [56]:
from pyspark.sql.types import ArrayType, IntegerType
count_ujf = F.udf(count_all_the_things, ArrayType(IntegerType()))

In [64]:
counts_jf = jf.withColumn('counts', count_ujf(jf['reviewText']))

In [65]:
counts_jf.limit(5)

DataFrame[asin: string, helpful: array<bigint>, overall: double, reviewText: string, reviewTime: string, reviewerID: string, reviewerName: string, summary: string, unixReviewTime: bigint, counts: array<int>]

## learn.co lab stuff:

## Understanding SparkContext - Lab

In [44]:
dir()

['ArrayType',
 'F',
 'In',
 'IntegerType',
 'Out',
 '_',
 '_11',
 '_13',
 '_14',
 '_16',
 '_18',
 '_2',
 '_23',
 '_24',
 '_25',
 '_27',
 '_28',
 '_3',
 '_30',
 '_36',
 '_37',
 '_43',
 '_5',
 '_7',
 '_8',
 '_9',
 '__',
 '___',
 '__builtin__',
 '__builtins__',
 '__doc__',
 '__loader__',
 '__name__',
 '__package__',
 '__spec__',
 '_dh',
 '_i',
 '_i1',
 '_i10',
 '_i11',
 '_i12',
 '_i13',
 '_i14',
 '_i15',
 '_i16',
 '_i17',
 '_i18',
 '_i19',
 '_i2',
 '_i20',
 '_i21',
 '_i22',
 '_i23',
 '_i24',
 '_i25',
 '_i26',
 '_i27',
 '_i28',
 '_i29',
 '_i3',
 '_i30',
 '_i31',
 '_i32',
 '_i33',
 '_i34',
 '_i35',
 '_i36',
 '_i37',
 '_i38',
 '_i39',
 '_i4',
 '_i40',
 '_i41',
 '_i42',
 '_i43',
 '_i44',
 '_i5',
 '_i6',
 '_i7',
 '_i8',
 '_i9',
 '_ih',
 '_ii',
 '_iii',
 '_oh',
 'count_all_the_things',
 'count_udf',
 'counts_df',
 'df',
 'exit',
 'get_ipython',
 'output',
 'pf',
 'pyspark',
 'query',
 'quit',
 'review_text_col',
 'reviews_df',
 'row_one',
 'sc',
 'show',
 'sorted_review_df',
 'spark',
 'word_co

In [45]:
help(help (sc))

Help on SparkContext in module pyspark.context object:

class SparkContext(builtins.object)
 |  Main entry point for Spark functionality. A SparkContext represents the
 |  connection to a Spark cluster, and can be used to create L{RDD} and
 |  broadcast variables on that cluster.
 |  
 |  Methods defined here:
 |  
 |  __enter__(self)
 |      Enable 'with SparkContext(...) as sc: app(sc)' syntax.
 |  
 |  __exit__(self, type, value, trace)
 |      Enable 'with SparkContext(...) as sc: app' syntax.
 |      
 |      Specifically stop the context on exit of the with block.
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
 |      Create a new SparkContext. At least the master and app name should be set,
 |      either through the named parameters here or through C{conf}.
 |      

Let's try to check a few spark context attributes including SparkContext.version and SparkContext.defaultParalellism to check the current version of Apache Spark and number of cores being used for parallel processing.

In [67]:
sc.version

'2.4.3'

In [68]:
sc.defaultParallelism

8

In [69]:
sc.appName

'pyspark-shell'

We can access complete configuration settings (including all defaults) for the current spark context using _conf.getAll() method.

A Spark Context can be shut down using SparkContext.stop() method. Let's use this method to shut down the current spark context. In this case `sc.stop()`

## Resilient Distributed Datasets (RDDs) - Lab

In [74]:
data = list(range(1000))
len(data)

1000

In [78]:
#create rdd with 10 partitions
rdd = sc.parallelize(data, numSlices=10)
print(type(rdd))
# <class 'pyspark.rdd.RDD'>

<class 'pyspark.rdd.RDD'>


In [79]:
rdd.getNumPartitions()

10

Basic descriptive RDD actions
Let's perform some basic operations on our RDD. In the cell below, use the methods: <br>

count: returns the total count of items in the RDD <br>
first: returns the first item in the RDD <br>
take: returns the first n items in the RDD <br>
top: returns the top n items <br>
collect: returns everything from your RDD<br>

It's important to note that in a big data context, calling the collect method will often take a very long time to execute and should be handled with care!

In [81]:
rdd.count()

1000

In [82]:
rdd.first()

0

In [83]:
rdd.take(6)

[0, 1, 2, 3, 4, 5]

In [84]:
rdd.top(6)

[999, 998, 997, 996, 995, 994]

In [85]:
# rdd.collect()

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,


In [86]:
import random
import numpy as np

Now we'll do some mapping.

In [88]:
nums = np.array(range(1,1001))
sales_figures = nums * np.random.rand(1000)
# sales_figures

In [89]:
#create an RDD called price_items using the newly created data with 10 slices.
price_items = sc.parallelize(sales_figures, numSlices=10)

In [90]:
price_items.take(6)

[0.36798328687304893,
 1.4905084514422406,
 2.1266286810769466,
 2.6200923771108107,
 1.2412576789528034,
 0.6763437909253847]

In [100]:
price_items.top(15)

[968.9029422152379,
 965.951440353942,
 909.1750931567626,
 895.2822647884403,
 887.8316179652227,
 877.2716755368646,
 864.6452287155496,
 854.7135591488441,
 852.8696867798243,
 846.2444427560786,
 844.2846080127582,
 842.5498471657254,
 841.0961082873267,
 831.1327685836549,
 829.8606722039958]

Now let's perform some operations on this simple dataset. To begin with, create a function that will take into account how much money BuyStuff will receive after sales tax has been applied (assume a sales tax of 8%). To make this happen, create a function called sales_tax that returns the amount of money our company will receive after the sale tax has been applied. The function will have this parameter:

item: (float) number to be multiplied by the sales tax. <br>

Apply that function to the rdd by using the map method and assign it to a variable renenue_minus_tax

In [103]:
def sales_tax(num):
    return num*.92

In [104]:
revenue_minus_tax = price_items.map(sales_tax)

In [105]:
revenue_minus_tax.sum()

228238.12791059093

In [106]:
revenue_minus_tax.top(5)

[891.3907068380189,
 888.6753251256266,
 836.4410857042217,
 823.6596836053651,
 816.805088528005]

Note that you can also use lambda functions if you want to quickly perform simple operations on data without creating a function. Let's assume that BuyStuff has also decided to offer a 10% discount on all of their items on the pre-tax amounts of each item. Use a lambda function within a map method to apply the additional 10% loss in revenue for BuyStuff and assign the transformed RDD to a new RDD called discounted.

In [107]:
discounted = revenue_minus_tax.map(lambda x : x*0.9)

In [108]:
discounted.sum()

205414.31511953185

You are also able to chain methods together with Spark. In one line, remove the tax and discount from the revenue of BuyStuff then use a collection method to see the 15 costliest items.

In [109]:
price_items.map(sales_tax).map(lambda x : x*0.9).top(15)

[802.2516361542171,
 799.807792613064,
 752.7969771337995,
 741.2937152448286,
 735.1245796752045,
 726.380947344524,
 715.9262493764751,
 707.7028269752429,
 706.1761006536946,
 700.690398602033,
 699.0676554345638,
 697.6312734532207,
 696.4275776619066,
 688.1779323872663,
 687.1246365849087]

In [110]:
discounted.top(15)

[802.2516361542171,
 799.807792613064,
 752.7969771337995,
 741.2937152448286,
 735.1245796752045,
 726.380947344524,
 715.9262493764751,
 707.7028269752429,
 706.1761006536946,
 700.690398602033,
 699.0676554345638,
 697.6312734532207,
 696.4275776619066,
 688.1779323872663,
 687.1246365849087]

We are able to see the full lineage of all the operations that have been performed on an RDD by using the RDD.toDebugString() method. As your transformations become more complex, you are encouraged to call this method to get a better understanding of the dependencies between RDDs. Try calling it on the discounted RDD to see what RDDs it is dependent on.

In [111]:
discounted.toDebugString()

b'(10) PythonRDD[112] at RDD at PythonRDD.scala:53 []\n |   ParallelCollectionRDD[98] at parallelize at PythonRDD.scala:195 []'

Map vs. Flatmap <br>
Depending on how you want your data to be outputted, you might want to use flatMap rather than a simple map. Let's take a look at how it performs operations versus the standard map. Let's say we wanted to maintain the original amount BuyStuff receives for each item as well as the new amount after the tax and discount are applied. Create a map function that will a tuple with (original price, post-discount price).

In [113]:
mapped = price_items.map(lambda x: (x, x*0.92 *0.9))
print(mapped.count())
print(mapped.top(10))

1000
[(968.9029422152379, 802.2516361542171), (965.951440353942, 799.807792613064), (909.1750931567626, 752.7969771337995), (895.2822647884403, 741.2937152448286), (887.8316179652227, 735.1245796752045), (877.2716755368646, 726.380947344524), (864.6452287155496, 715.9262493764751), (854.7135591488441, 707.7028269752429), (852.8696867798243, 706.1761006536946), (846.2444427560786, 700.690398602033)]


Note that we have 1000 tuples created to our specification. Let's take a look at how flatMap differs in its implementation. Use the flatMap method with the same function you created above.

In [114]:
flat_mapped = price_items.flatMap(lambda x : (x, x*0.92*0.9 ))
print(flat_mapped.count())
print(flat_mapped.top(10))

2000
[968.9029422152379, 965.951440353942, 909.1750931567626, 895.2822647884403, 887.8316179652227, 877.2716755368646, 864.6452287155496, 854.7135591488441, 852.8696867798243, 846.2444427560786]


Filter <br>
After meeting with some external consultants, BuyStuff has determined that its business will be more profitable if it focuses on higher ticket items. Now, use the filter method to select items that bring in more than $300 after tax and discount have been removed. A filter method is a specialized form of a map function that only returns the items that match a certain criteria. In the cell below: <br>

use a lambda function within a filter function to meet the consultant's suggestion's specifications. set RDD = selected_items <br>
calculate the total number of items remaining in BuyStuff's inventory <br>


In [115]:
selected_items = discounted.filter(lambda x: x>300)
selected_items.count()

274

Reduce <br>
Now it's time to figure out how much money BuyStuff would make from selling one of all of it's items after they've reduced their inventory. Use a reduce method with a lambda function to to add up all of the values in the RDD. Your lambda function should have two variables.

In [116]:
selected_items.reduce(lambda x,y :x + y)

124934.06646485813

In [118]:
from operator import add
selected_items.reduce(add)

124934.06646485813

The time has come for BuyStuff to open up shop and start selling it's goods. It only has one of each item, but it's allowing 50 lucky users to buy as many items as they want while they remain in stock. Within seconds, BuyStuff is sold out. Below, you'll find the sales data in an RDD with tuples of (user, item bought).

In [120]:
import random
random.seed(42)
# generating simulated users that have bought each item
sales_data = selected_items.map(lambda x: (random.randint(1,50),x))

sales_data.take(7)

[(22, 303.4624427773786),
 (10, 315.6169912192194),
 (13, 337.7230685094742),
 (11, 348.07129244539885),
 (37, 342.4714746018663),
 (15, 350.23475907942833),
 (15, 334.146155059639)]

It's time to determine some basic statistics about BuyStuff users.

Let's start off by creating an RDD that determines how much each user spent in total. To do this we can use a method called reduceByKey to perform reducing operations while grouping by keys. After you have calculated the total, use the sortBy method on the RDD to rank the users from highest spending to least spending.

In [123]:
total_spent = sales_data.reduceByKey(lambda x,y :x + y)
total_spent.take(10)

[(50, 11542.767611185882),
 (10, 3707.4074331473635),
 (40, 1885.1598469952055),
 (31, 5252.41224347783),
 (11, 4267.497122488841),
 (41, 4711.003807706706),
 (1, 1093.0944176573632),
 (2, 6889.602162272571),
 (32, 2363.8882710575795),
 (12, 6574.032284886426)]

In [125]:
total_spent.sortBy(lambda x: x[1],ascending = False).collect()

[(50, 11542.767611185882),
 (23, 10436.863310364082),
 (2, 6889.602162272571),
 (48, 6678.452110419206),
 (12, 6574.032284886426),
 (15, 5805.965089662124),
 (6, 5618.594660703813),
 (31, 5252.41224347783),
 (47, 4955.19240321956),
 (37, 4806.55955888658),
 (41, 4711.003807706706),
 (46, 4633.458945130777),
 (4, 4518.740547895117),
 (11, 4267.497122488841),
 (17, 3800.2752688267865),
 (10, 3707.4074331473635),
 (45, 3399.9164830691407),
 (7, 2601.3877711336913),
 (29, 2529.2351428588377),
 (35, 2491.7212012131413),
 (32, 2363.8882710575795),
 (44, 2257.807667032602),
 (28, 2144.744044546806),
 (24, 2141.355373192837),
 (40, 1885.1598469952055),
 (18, 1732.0120341038255),
 (22, 1636.7996931104503),
 (5, 1451.0508290516796),
 (13, 1117.4646435338743),
 (1, 1093.0944176573632),
 (9, 799.807792613064),
 (36, 651.8393595868904),
 (16, 437.95733382747915)]

Next, let's determine how many items were bought per user. This can be solved in one line using an RDD method. After you've counted the total number of items bought per person, sort the users from most number of items bought to least number of items. Time to start a customer loyalty program!

In [126]:
total_items = sales_data.countByKey()
sorted(total_items.items(),key=lambda kv:kv[1],reverse=True)

[(50, 25),
 (23, 24),
 (2, 16),
 (48, 15),
 (12, 15),
 (6, 12),
 (31, 12),
 (46, 11),
 (47, 11),
 (15, 11),
 (37, 10),
 (4, 10),
 (11, 9),
 (41, 9),
 (45, 8),
 (10, 8),
 (17, 8),
 (32, 6),
 (29, 6),
 (7, 6),
 (44, 6),
 (35, 5),
 (24, 5),
 (40, 4),
 (28, 4),
 (18, 4),
 (22, 4),
 (13, 3),
 (1, 2),
 (5, 2),
 (16, 1),
 (9, 1),
 (36, 1)]

In [127]:
total_items.items()

dict_items([(6, 12), (2, 16), (37, 10), (23, 24), (32, 6), (46, 11), (29, 6), (7, 6), (50, 25), (48, 15), (31, 12), (44, 6), (12, 15), (47, 11), (45, 8), (4, 10), (11, 9), (10, 8), (35, 5), (24, 5), (15, 11), (41, 9), (40, 4), (17, 8), (28, 4), (18, 4), (22, 4), (13, 3), (1, 2), (5, 2), (16, 1), (9, 1), (36, 1)])

## Word Count with Map-Reduce - Lab