# SI 618: Data Manipulation and Analysis
## 12 - Big Data III: Spark DataFrames

### Dr. Chris Teplovs, School of Information, University of Michigan
<small><a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a>This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License</a>.

### [PDF of today's slides](resources/618_12_Spark_DataFrames.pdf) <-- LINK THERE

In [None]:
df_from_other_list = spark.createDataFrame([('Chris',67),('Logan',70)], ['name','score'])
df_from_other_list.show()

In [None]:
from pyspark.sql.types import FloatType
df_from_list = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], FloatType())
df_from_list.show()

In [None]:
lot_rdd = sc.parallelize([('Chris',67),('Logan',70)])

In [None]:
dfPeople = spark.createDataFrame(lot_rdd)
dfPeople.show()

In [None]:
from pyspark.sql import Row
lot_rdd_named_columns = lot_rdd.map(lambda x: Row(name=x[0], score=int(x[1])))
dfPeople_named_columns = spark.createDataFrame(lot_rdd_named_columns)
dfPeople_named_columns.show()

In [None]:
df = spark.read.json('s3://umsi-data-science/data/yelp/business.json')

In [None]:
df.take(5)

In [None]:
df.printSchema()

In [None]:
df.select("name").show()

In [None]:
df.select(df['name'], df['review_count'] + 1).show()

In [None]:
df.filter(df['stars'] >= 4).show()

In [None]:
df.groupBy('stars').count().show()

In [None]:
df.groupBy('stars').count().sort('stars', ascending=False).show()

In [None]:
df_from_other_list = spark.createDataFrame([('Chris',[67,42]),('Logan',[70,72])],['name','scores'])

In [None]:
df_from_other_list.show()

In [None]:
from pyspark.sql.functions import explode

In [None]:
df_exploded = df_from_other_list.withColumn('score',explode('scores'))

In [None]:
df_exploded.show()

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col
df_exploded.withColumn('good',F.when(df_exploded['score'] > 50,1).otherwise(0)).show()

In [None]:
STOPWORDS = {'i', 'we', 'ourselves', 'hers', 'between', 'yourself', 'but', 'again', 'there', 'about', 'once', 'during', 'out', 'very', 'having', 'with', 'they', 'own', 'an', 'be', 'some', 'for', 'do', 'its', 'yours', 'such', 'into', 'of', 'most', 'itself', 'other', 'off', 'is', 's', 'am', 'or', 'who', 'as', 'from', 'him', 'each', 'the', 'themselves', 'until', 'below', 'are', 'we', 'these', 'your', 'his', 'through', 'don', 'nor', 'me', 'were', 'her', 'more', 'himself', 'this', 'down', 'should', 'our', 'their', 'while', 'above', 'both', 'up', 'to', 'ours', 'had', 'she', 'all', 'no', 'when', 'at', 'any', 'before', 'them', 'same', 'and', 'been', 'have', 'in', 'will', 'on', 'does', 'yourselves', 'then', 'that', 'because', 'what', 'over', 'why', 'so', 'can', 'did', 'not', 'now', 'under', 'he', 'you', 'herself', 'has', 'just', 'where', 'too', 'only', 'myself', 'which', 'those', 'i', 'after', 'few', 'whom', 't', 'being', 'if', 'theirs', 'my', 'against', 'a', 'by', 'doing', 'it', 'how', 'further', 'was', 'here', 'than'}
sw = sc.broadcast(STOPWORDS)

## Text analysis of Yelp reviews, warm-up

In [None]:
# This takes about 3 minutes (180s) on a single-node m4.xlarge cluster
df = spark.read.json('s3://umsi-data-science/data/yelp/review.json.gz')

In [None]:
df.printSchema()

### Q1: How many reviews are there?

In [None]:
# Insert your code here

### Q2: How many positive (i.e. 4- or 5-star) reviews are there? 

In [None]:
# Insert your code here

# -----


Let's read in two of the three data files from the Yelp academic dataset (https://www.kaggle.com/yelp-dataset/yelp-dataset) and examine the schemas for each one (we're skipping the reviews.json file for this part of the class):

In [None]:
business = spark.read.json('s3://umsi-data-science/data/yelp/business.json')

In [None]:
business.printSchema()

In [None]:
# review = spark.read.json('s3://umsi-data-science/data/yelp/review.json.gz')

In [None]:
# review.printSchema()

In [None]:
tip = spark.read.json('s3://umsi-data-science/data/yelp/tip.json')

In [None]:
tip.printSchema()

### Let's try to find the name of the business that has the highest number of "tips":

In [None]:
most_tips = tip.groupBy('business_id').count().sort('count', ascending = False)

In [None]:
from pyspark.sql.functions import col
most_tips = most_tips.withColumn('the_count',col('count'))

In [None]:
most_tips.show()

In [None]:
joined = most_tips.join(business,'business_id','left').sort('the_count',ascending=False)

In [None]:
most_tips_joined = joined.select("name","the_count").filter(joined['the_count'] > 1000).collect()

In [None]:
for b in most_tips_joined:
    print(b.name,b.count,b.the_count)


## Your turn
Use a combination of Spark and plain old python code to answer the following questions.  Include code and written responses in English for each question.

### Q3. How many businesses in the data set are located in the state of Ohio (OH)?

In [None]:
# insert your code here

### Q4. How many Pennsylvania-based businesses have a hipster ambience?

In [None]:
# insert your code here

### Q5. Which Nevada-based business has the most liked tip, and what is the text of the tip?

In [None]:
# insert your code here

### Q6. Excluding businesses in the state of Nevada, list 10 businesses with the highest number of tips

In [None]:
# insert your code here

### Q7. List the names of the divey businesses from Ohio that have an overall rating of 4 or more stars and have at least 1000 tips.
You might want to do this in several steps.

In [None]:
# insert your code here

## END OF IN-CLASS NOTEBOOK
Please download the notebook in HTML and IPYNB formats and submit both to Canvas.

### **PLEASE REMEMBER TO TERMINATE YOUR CLUSTER(S) WHEN YOU ARE DONE!**