# Spark: Getting Started
 * These instructions require a Mac with [Anaconda3](https://anaconda.com/) and [Homebrew](https://brew.sh/) installed.
 * Useful for small data only. For larger data, try [Databricks](https://databricks.com/).

## Step 0: Prerequisites & Installation

Run these commands in your terminal (just once).

```bash
# Make Homebrew aware of old versions of casks
brew tap caskroom/versions

# Install Java 1.8 (OpenJDK 8)
brew cask install adoptopenjdk8

# Install the current version of Spark
brew install apache-spark

# Install Py4J (connects PySpark to the Java Virtual Machine)
pip install py4j

# Add JAVA_HOME to .bash_profile (makes Java 1.8 your default JVM)
echo "# Apache Spark" >> ~/.bash_profile
echo "export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)" >> ~/.bash_profile

# Add SPARK_HOME to .bash_profile
echo "export SPARK_HOME=/usr/local/Cellar/apache-spark/2.4.3/libexec" >> ~/.bash_profile

# Add PySpark to PYTHONPATH in .bash_profile
echo "export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH" >> ~/.bash_profile

# Update current environment
source ~/.bash_profile

```

## Step 1: Create a SparkSession with a SparkContext

In [1]:
#running locally not connected to a cluster
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
spark

In [3]:
sc

## Step 2: Download some Amazon reviews (Toys & Games)

In [4]:
# Download data (run this only once)
!wget http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Toys_and_Games_5.json.gz
!gunzip reviews_Toys_and_Games_5.json.gz

--2019-08-29 10:59:47--  http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Toys_and_Games_5.json.gz
Resolving snap.stanford.edu (snap.stanford.edu)... 171.64.75.80
Connecting to snap.stanford.edu (snap.stanford.edu)|171.64.75.80|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 42057555 (40M) [application/x-gzip]
Saving to: ‘reviews_Toys_and_Games_5.json.gz’


2019-08-29 10:59:51 (8.65 MB/s) - ‘reviews_Toys_and_Games_5.json.gz’ saved [42057555/42057555]



## Step 3: Create a Spark DataFrame

In [5]:
df = spark.read.json('reviews_Toys_and_Games_5.json')

In [8]:
df.schema

StructType(List(StructField(asin,StringType,true),StructField(helpful,ArrayType(LongType,true),true),StructField(overall,DoubleType,true),StructField(reviewText,StringType,true),StructField(reviewTime,StringType,true),StructField(reviewerID,StringType,true),StructField(reviewerName,StringType,true),StructField(summary,StringType,true),StructField(unixReviewTime,LongType,true)))

spark clusters can have many computers. WIth hdfs if a computer fails, data is backedup on other computers. spark deals w hadoop problem of being slow. 
Spark does everything in RAM , faster than disk and less expensive.
but what if one of the computers still fails? instead of keeping multiple copies, remembrs where it loaded it from. 
in spark everything is immutable , nothing can be changed in place ...always creates new and forgets previous data . if it fails it gets back from where it originally got it from.

Spark does everything lazy. If you ask it to presist it, it loads everytime. but presist means don't throw it away.

In [6]:
df.persist()  #dont forget the df and just presist it 

DataFrame[asin: string, helpful: array<bigint>, overall: double, reviewText: string, reviewTime: string, reviewerID: string, reviewerName: string, summary: string, unixReviewTime: bigint]

In [9]:
df.limit(5).toPandas()

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,439893577,"[0, 0]",5.0,I like the item pricing. My granddaughter want...,"01 29, 2014",A1VXOAVRGKGEAK,Angie,Magnetic board,1390953600
1,439893577,"[1, 1]",4.0,Love the magnet easel... great for moving to d...,"03 28, 2014",A8R62G708TSCM,Candace,it works pretty good for moving to different a...,1395964800
2,439893577,"[1, 1]",5.0,Both sides are magnetic. A real plus when you...,"01 28, 2013",A21KH420DK0ICA,capemaychristy,love this!,1359331200
3,439893577,"[0, 0]",5.0,Bought one a few years ago for my daughter and...,"02 8, 2014",AR29QK6HPFYZ4,dcrm,Daughters love it,1391817600
4,439893577,"[1, 1]",4.0,I have a stainless steel refrigerator therefor...,"05 5, 2014",ACCH8EOML6FN5,DoyZ,Great to have so he can play with his alphabet...,1399248000


In [10]:
df.count() #if you run this twice it is faster the 2nd time

167597

In [11]:
reviews_df = df[['asin', 'overall']]

In [12]:
#convert spark data back to pandas
def show(df, n=5):
    return df.limit(n).toPandas()

In [13]:
show(reviews_df)

Unnamed: 0,asin,overall
0,439893577,5.0
1,439893577,4.0
2,439893577,5.0
3,439893577,5.0
4,439893577,4.0


In [15]:
reviews_df.show()

+----------+-------+
|      asin|overall|
+----------+-------+
|0439893577|    5.0|
|0439893577|    4.0|
|0439893577|    5.0|
|0439893577|    5.0|
|0439893577|    4.0|
|0439893577|    3.0|
|0439893577|    3.0|
|0439893577|    5.0|
|0439893577|    4.0|
|0439893577|    3.0|
|0439893577|    5.0|
|0439893577|    5.0|
|0439893577|    3.0|
|0439893577|    5.0|
|0439893577|    5.0|
|0439893577|    5.0|
|0439893577|    5.0|
|048645195X|    5.0|
|048645195X|    4.0|
|048645195X|    5.0|
+----------+-------+
only showing top 20 rows



In [28]:
reviews_df.count()

167597

In [29]:
show(reviews_df)

Unnamed: 0,asin,overall
0,439893577,5.0
1,439893577,4.0
2,439893577,5.0
3,439893577,5.0
4,439893577,4.0


In [31]:
sorted_review_df = reviews_df.sort('overall')

In [32]:
show(sorted_review_df)

Unnamed: 0,asin,overall
0,786955708,1.0
1,976990709,1.0
2,963679600,1.0
3,786955708,1.0
4,974665207,1.0


In [16]:
#u can do SQL functions with spark dataframe
import pyspark.sql.functions as F

In [17]:
counts = reviews_df.agg(F.countDistinct('overall'))

In [18]:
query = """
SELECT overall, COUNT(*)
FROM reviews
GROUP BY overall
ORDER BY overall
"""

In [19]:
reviews_df.createOrReplaceTempView('reviews')

In [20]:
output = spark.sql(query)

In [21]:
show(output, n=1000)

Unnamed: 0,overall,count(1)
0,1.0,4707
1,2.0,6298
2,3.0,16357
3,4.0,37445
4,5.0,102790


In [23]:
#resilient distributed dataset (rdd: collection of rows)
reviews_df.rdd

MapPartitionsRDD[48] at javaToPython at NativeMethodAccessorImpl.java:0

### Count the words in the first row

In [24]:
row_one = df.first()

In [25]:
row_one

Row(asin='0439893577', helpful=[0, 0], overall=5.0, reviewText='I like the item pricing. My granddaughter wanted to mark on it but I wanted it just for the letters.', reviewTime='01 29, 2014', reviewerID='A1VXOAVRGKGEAK', reviewerName='Angie', summary='Magnetic board', unixReviewTime=1390953600)

In [26]:
def word_count(text):
    return len(text.split())

In [27]:
word_count(row_one['reviewText'])

20

In [28]:
#word_count_udf is a wrapper for scala and python work together
#pyspark.sql is basically spark dataframe (hivesql is extended v of sql that spark uses)
from pyspark.sql.types import IntegerType
word_count_udf = F.udf(word_count, IntegerType())

In [31]:
review_text_col = df['reviewText']

In [32]:
counts_df = df.withColumn('wordCount', word_count_udf(review_text_col))

In [33]:
show(counts_df).T

Unnamed: 0,0,1,2,3,4
asin,0439893577,0439893577,0439893577,0439893577,0439893577
helpful,"[0, 0]","[1, 1]","[1, 1]","[0, 0]","[1, 1]"
overall,5,4,5,5,4
reviewText,I like the item pricing. My granddaughter want...,Love the magnet easel... great for moving to d...,Both sides are magnetic. A real plus when you...,Bought one a few years ago for my daughter and...,I have a stainless steel refrigerator therefor...
reviewTime,"01 29, 2014","03 28, 2014","01 28, 2013","02 8, 2014","05 5, 2014"
reviewerID,A1VXOAVRGKGEAK,A8R62G708TSCM,A21KH420DK0ICA,AR29QK6HPFYZ4,ACCH8EOML6FN5
reviewerName,Angie,Candace,capemaychristy,dcrm,DoyZ
summary,Magnetic board,it works pretty good for moving to different a...,love this!,Daughters love it,Great to have so he can play with his alphabet...
unixReviewTime,1390953600,1395964800,1359331200,1391817600,1399248000
wordCount,20,22,76,31,47


In [34]:
#register a python func w sparksql and use this python func in a sql query!!! cool

from pyspark.sql.types import IntegerType
word_count_udf = F.udf(word_count, IntegerType())

df.createOrReplaceTempView('reviews')
spark.udf.register('word_count', word_count_udf)

<function __main__.word_count(text)>

In [35]:
query = """
SELECT asin, overall, reviewText, word_count(reviewText) AS wordCount
FROM reviews
"""

In [36]:
counts_df = spark.sql(query)

In [37]:
show(counts_df)

Unnamed: 0,asin,overall,reviewText,wordCount
0,439893577,5.0,I like the item pricing. My granddaughter want...,20
1,439893577,4.0,Love the magnet easel... great for moving to d...,22
2,439893577,5.0,Both sides are magnetic. A real plus when you...,76
3,439893577,5.0,Bought one a few years ago for my daughter and...,31
4,439893577,4.0,I have a stainless steel refrigerator therefor...,47


In [38]:
def count_all_the_things(text):
    return [len(text), len(text.split())]

In [39]:
from pyspark.sql.types import ArrayType, IntegerType
count_udf = F.udf(count_all_the_things, ArrayType(IntegerType()))

In [43]:
counts_df = df.withColumn('counts', count_udf(df['reviewText']))

In [44]:
show(counts_df, 1)

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime,counts
0,439893577,"[0, 0]",5.0,I like the item pricing. My granddaughter want...,"01 29, 2014",A1VXOAVRGKGEAK,Angie,Magnetic board,1390953600,"[100, 20]"


In [45]:
counts_df[['asin','helpful','counts']].printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- counts: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [98]:
slim_counts_df = (
    df.drop('reviewTime')
      .drop('helpful')
      .withColumn('counts', count_udf(df['reviewText']))
      .drop('reviewText')
)

In [101]:
show(slim_counts_df, n=1)

Unnamed: 0,asin,overall,reviewerID,reviewerName,summary,unixReviewTime,counts
0,439893577,5.0,A1VXOAVRGKGEAK,Angie,Magnetic board,1390953600,"[100, 20]"


In [96]:
import this

The Zen of Python, by Tim Peters

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
