# SI 618: 
## Day 13: Spark

### Dr. Chris Teplovs, School of Information, University of Michigan
<small><a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a> This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License</a>.</small>
    
### Please ensure you have this version:
Version 2022.04.11.1.CT

In [1]:
print("Hello, world")

Hello, world


In [2]:
spark # (newer) for Spark DataFrames

In [4]:
sc # (older) for Spark RDDs

In [5]:
# do not overwirte these 2 variables

Load a (text) file into an RDD:

In [6]:
r = sc.textFile('/scratch/si618w22_class_root/si618w22_class/shared_data/nytimes_news_articles.txt')

In [7]:
type(r)

pyspark.rdd.RDD

In [8]:
r.take(3) # look first few lines

['URL: http://www.nytimes.com/2016/06/30/sports/baseball/washington-nationals-max-scherzer-baffles-mets-completing-a-sweep.html',
 '',
 'WASHINGTON — Stellar pitching kept the Mets afloat in the first half of last season despite their offensive woes. But they cannot produce an encore of their pennant-winning season if their lineup keeps floundering while their pitching is nicked, bruised and stretched thin.']

Load a (compressed) CSV file into a spark dataframe:

In [None]:
sf_fire_file = "/scratch/si618w22_class_root/si618w22_class/shared_data/Fire_Incidents.csv.gz"
fire_df = spark.read.csv(sf_fire_file, header=True)

In [None]:
len(fire_df.columns)

In [None]:
fire_df.schema

In [None]:
fire_df.printSchema()

In [None]:
from pyspark.sql.functions import *

few_fire_df = (fire_df
  .select("Incident Number", "Alarm DtTm", "Primary Situation") 
  .where(col("Primary Situation") != "311 - Medical assist, assist EMS crew"))
few_fire_df.show(5, truncate=False)

In [None]:
(fire_df
  .select("Primary Situation")
  .where(col("Primary Situation").isNotNull())
  .agg(countDistinct("Primary Situation").alias("DistinctCallTypes"))
  .show())

Convert strings to timestamps:

In [None]:
(fire_df
  .select("Primary Situation")
  .where(col("Primary Situation").isNotNull())
  .distinct()
  .show(10, False))

In [None]:
fire_ts_df = (fire_df
  .withColumn("AlarmDateTime", to_timestamp(col("Alarm DtTm")))
  .drop("Alarm DtTm") 
  .withColumn("ArrivalDateTime", to_timestamp(col("Arrival DtTm")))
  .drop("Arrival DtTm") )

# Select the converted columns
(fire_ts_df
  .select("AlarmDateTime", "ArrivalDateTime")
  .show(5, False))

### Q1: Provide a description of your understanding of what the Great Lakes HPC Cluster is used for, and propose some ideas about the sorts of data you could analyze with the enhanced computing power that it provides.

I think Great Lakes can be used to run model training with a large amount of data or very complex models, greatly saving model training time and better tuning parameters.

# Spark RDDs

### Background: mapping and reducing in (plain old) python

In [9]:
numbers = [1, 2, 3, 4]

In [10]:
def square(number):
    return number**2

In [11]:
map(square, numbers)

<map at 0x2ab1e932e490>

In [12]:
list(map(square, numbers))

[1, 4, 9, 16]

In [13]:
for squared_number in map(square, numbers):
    print(squared_number)

1
4
9
16


Let's grab `reduce` from functools:

In [14]:
from functools import reduce

In [17]:
reduce(lambda a,b: a+b, [1, 4, 9, 16])

30

In [18]:
reduce(lambda a,b: a+b, map(square, numbers))

30

Don't like the whole `lambda a,b: a+b`?  Use the `add` function from the `operator` library:

In [19]:
from operator import add

In [20]:
reduce(add, map(square, numbers))

30

First, let's load our regular expressions library (yup, again):

In [21]:
import re

Now let's load in a datafile into an RDD:

In [22]:
input_file = sc.textFile('/nfs/turbo/arcts-data-hadoop-stage/data/Gutenberg.txt')

We're going to be using the same regex over and over again, so it's best to "compile" it so that we can leverage a python optimization:

In [24]:
WORD_RE = re.compile(r"\b[\w']+\b")

In [25]:
input_file.take(2)

['', 'LINCOLN LETTERS']

The next block will do three things, which we'll talk about in class

In [26]:
word_count1 = input_file.flatMap(lambda line: WORD_RE.findall(line+''))
word_count2 = word_count1.map(lambda word: (word, 1)) 
word_count3 = word_count2.reduceByKey(lambda a, b: a + b)

In [27]:
word_count3

PythonRDD[10] at RDD at PythonRDD.scala:53

In [28]:
word_count3.take(3)

[('of', 6666351), ('there', 508083), ('answered', 74492)]

Insert your answer here.

Now let's sort the results by value in descending order and put the results in another RDD:

In [29]:
word_counts_sorted = word_count3.sortBy(lambda x: x[1], ascending =
False)
# top100_sorted = sc.parallelize(word_counts_sorted.take(100))
word_counts_sorted.take(10)

[('the', 11518058),
 ('of', 6666351),
 ('and', 6573202),
 ('to', 5692140),
 ('a', 4394263),
 ('in', 3419281),
 ('I', 3012929),
 ('that', 2729376),
 ('was', 2449989),
 ('he', 2116609)]

In [None]:
top_100 = top100_sorted.collect()
for word in top_100:
    print(word[0],'\t',word[1])

In [30]:
counts = input_file.flatMap(lambda line: WORD_RE.findall(line+'')) \
             .map(lambda word: word.lower()) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b) \
             .sortBy(lambda a: a[1],ascending=False)

In [31]:
counts.take(10)

[('the', 12502647),
 ('and', 6973848),
 ('of', 6761399),
 ('to', 5801177),
 ('a', 4601096),
 ('in', 3622307),
 ('i', 3033950),
 ('that', 2846731),
 ('he', 2671184),
 ('was', 2466757)]

In [None]:
words = input_file.flatMap(lambda line: WORD_RE.findall(line))

In [None]:
counts = words.map(lambda word: (None,1)).reduceByKey(lambda a, b: a + b)

In [None]:
counts.take(1)

In [None]:
word_counts = input_file.flatMap(lambda line: WORD_RE.findall(line)).countByValue()
type(word_counts)

In [None]:
word_counts

## Spark DataFrames

In [32]:
df_from_other_list = spark.createDataFrame([('Chris',67),('Logan',70)], ['name','score'])
df_from_other_list.show()

+-----+-----+
| name|score|
+-----+-----+
|Chris|   67|
|Logan|   70|
+-----+-----+



In [33]:
from pyspark.sql.types import FloatType
df_from_list = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], FloatType())
df_from_list.show()

+-----+
|value|
+-----+
|  1.0|
|  2.0|
|  3.0|
|  4.0|
|  5.0|
+-----+



In [34]:
type(df_from_list)

pyspark.sql.dataframe.DataFrame

In [35]:
df_from_list.rdd

MapPartitionsRDD[47] at javaToPython at NativeMethodAccessorImpl.java:0

In [36]:
df_from_list.rdd.collect()

[Row(value=1.0),
 Row(value=2.0),
 Row(value=3.0),
 Row(value=4.0),
 Row(value=5.0)]

In [38]:
# other ways to innitialize

lot_rdd = sc.parallelize([('Chris',67),('Logan',70)])

In [39]:
dfPeople = spark.createDataFrame(lot_rdd)
dfPeople.show()

+-----+---+
|   _1| _2|
+-----+---+
|Chris| 67|
|Logan| 70|
+-----+---+



In [45]:
# other ways to innitialize

from pyspark.sql import Row
lot_rdd_named_columns = lot_rdd.map(lambda x: Row(name=x[0], score=int(x[1])))
dfPeople_named_columns = spark.createDataFrame(lot_rdd_named_columns)
dfPeople_named_columns.show()

+-----+-----+
| name|score|
+-----+-----+
|Chris|   67|
|Logan|   70|
+-----+-----+



In [None]:
# one example

In [42]:
df = spark.read.json('/scratch/si618w22_class_root/si618w22_class/shared_data/yelp_academic_dataset_business.json.gz')

In [43]:
df.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [49]:
df.take(5)

[Row(address='2818 E Camino Acequia Drive', attributes=Row(AcceptsInsurance=None, AgesAllowed=None, Alcohol=None, Ambience=None, BYOB=None, BYOBCorkage=None, BestNights=None, BikeParking=None, BusinessAcceptsBitcoin=None, BusinessAcceptsCreditCards=None, BusinessParking=None, ByAppointmentOnly=None, Caters=None, CoatCheck=None, Corkage=None, DietaryRestrictions=None, DogsAllowed=None, DriveThru=None, GoodForDancing=None, GoodForKids='False', GoodForMeal=None, HairSpecializesIn=None, HappyHour=None, HasTV=None, Music=None, NoiseLevel=None, Open24Hours=None, OutdoorSeating=None, RestaurantsAttire=None, RestaurantsCounterService=None, RestaurantsDelivery=None, RestaurantsGoodForGroups=None, RestaurantsPriceRange2=None, RestaurantsReservations=None, RestaurantsTableService=None, RestaurantsTakeOut=None, Smoking=None, WheelchairAccessible=None, WiFi=None), business_id='1SWheh84yJXfytovILXOAQ', categories='Golf, Active Life', city='Phoenix', hours=None, is_open=0, latitude=33.5221425, longit

In [50]:
output = df.take(5)

In [51]:
output.address

AttributeError: 'list' object has no attribute 'address'

In [47]:
df.show(5)

+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|       city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|2818 E Camino Ace...|{null, null, null...|1SWheh84yJXfytovI...|   Golf, Active Life|    Phoenix|                null|      0|   33.5221425|   -112.0184807|Arizona Biltmore ...|      85016|           5|  3.0|   AZ|
|30 Eglinton Avenue W|{null, null, u'fu...|QXAEGFB4oINsVuTFx...|Specialty Food, R...|Mississauga|{9:0-1:0, 9:0-0:0...|      1|43.6054989743|

In [48]:
df.select("name").show()

+--------------------+
|                name|
+--------------------+
|Arizona Biltmore ...|
|Emerald Chinese R...|
|Musashi Japanese ...|
|Farmers Insurance...|
| Queen City Plumbing|
|       The UPS Store|
|    Edgeworxx Studio|
|           Supercuts|
|Vita Bella Fine D...|
| Options Salon & Spa|
|Nucleus Informati...|
|           Taco Bell|
|The Kilted Buffal...|
|       Marco's Pizza|
|          Baby Cakes|
|   Hot Yoga Wellness|
|          Knot Salon|
|Carluccio's Tivol...|
|Myron Hensel Phot...|
|  Totum Life Science|
+--------------------+
only showing top 20 rows



In [52]:
df.select("name").show(truncate=False)

+-------------------------------+
|name                           |
+-------------------------------+
|Arizona Biltmore Golf Club     |
|Emerald Chinese Restaurant     |
|Musashi Japanese Restaurant    |
|Farmers Insurance - Paul Lorenz|
|Queen City Plumbing            |
|The UPS Store                  |
|Edgeworxx Studio               |
|Supercuts                      |
|Vita Bella Fine Day Spa        |
|Options Salon & Spa            |
|Nucleus Information Service    |
|Taco Bell                      |
|The Kilted Buffalo Langtree    |
|Marco's Pizza                  |
|Baby Cakes                     |
|Hot Yoga Wellness              |
|Knot Salon                     |
|Carluccio's Tivoli Gardens     |
|Myron Hensel Photography       |
|Totum Life Science             |
+-------------------------------+
only showing top 20 rows



In [54]:
df.select(df['name'], df['review_count'] + 1).show()

+--------------------+------------------+
|                name|(review_count + 1)|
+--------------------+------------------+
|Arizona Biltmore ...|                 6|
|Emerald Chinese R...|               129|
|Musashi Japanese ...|               171|
|Farmers Insurance...|                 4|
| Queen City Plumbing|                 5|
|       The UPS Store|                 4|
|    Edgeworxx Studio|                 8|
|           Supercuts|                 4|
|Vita Bella Fine D...|                 9|
| Options Salon & Spa|                 9|
|Nucleus Informati...|                 6|
|           Taco Bell|                19|
|The Kilted Buffal...|                10|
|       Marco's Pizza|                17|
|          Baby Cakes|                 8|
|   Hot Yoga Wellness|                 5|
|          Knot Salon|                 6|
|Carluccio's Tivol...|                41|
|Myron Hensel Phot...|                22|
|  Totum Life Science|                24|
+--------------------+------------

In [55]:
df.count()

192609

In [57]:
df.filter(df['stars'] >= 4).select('name').show()

+--------------------+
|                name|
+--------------------+
|Musashi Japanese ...|
|Farmers Insurance...|
| Queen City Plumbing|
|Vita Bella Fine D...|
| Options Salon & Spa|
|       Marco's Pizza|
|   Hot Yoga Wellness|
|          Knot Salon|
|Carluccio's Tivol...|
|Myron Hensel Phot...|
|  Totum Life Science|
|      Fremont Arcade|
|        Hunk Mansion|
|      Marathon Diner|
|            Maurices|
|Maria's Mexican R...|
|       Bakery Gateau|
| Uncle Otis Clothing|
|        AW Collision|
|JSE Automotive Se...|
+--------------------+
only showing top 20 rows



In [58]:
df.groupBy('stars').count().show()

+-----+-----+
|stars|count|
+-----+-----+
|  3.5|35008|
|  4.5|27301|
|  2.5|18843|
|  1.0| 4874|
|  4.0|35969|
|  3.0|25996|
|  2.0|11426|
|  1.5| 4976|
|  5.0|28216|
+-----+-----+



In [59]:
df.groupBy('stars').count().sort('stars', ascending=False).show()

+-----+-----+
|stars|count|
+-----+-----+
|  5.0|28216|
|  4.5|27301|
|  4.0|35969|
|  3.5|35008|
|  3.0|25996|
|  2.5|18843|
|  2.0|11426|
|  1.5| 4976|
|  1.0| 4874|
+-----+-----+



In [60]:
df.groupby('stars').count().sort('count', ascending=False).show()

+-----+-----+
|stars|count|
+-----+-----+
|  4.0|35969|
|  3.5|35008|
|  5.0|28216|
|  4.5|27301|
|  3.0|25996|
|  2.5|18843|
|  2.0|11426|
|  1.5| 4976|
|  1.0| 4874|
+-----+-----+



In [61]:
df_from_other_list = spark.createDataFrame([('Chris',[67,42]),('Logan',[70,72])],['name','scores'])

In [62]:
df_from_other_list.show()

+-----+--------+
| name|  scores|
+-----+--------+
|Chris|[67, 42]|
|Logan|[70, 72]|
+-----+--------+



In [63]:
from pyspark.sql.functions import explode

In [64]:
df_exploded = df_from_other_list.withColumn('score',explode('scores'))

In [65]:
df_exploded.show()

+-----+--------+-----+
| name|  scores|score|
+-----+--------+-----+
|Chris|[67, 42]|   67|
|Chris|[67, 42]|   42|
|Logan|[70, 72]|   70|
|Logan|[70, 72]|   72|
+-----+--------+-----+



In [66]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col
df_exploded.withColumn('good',F.when(df_exploded['score'] > 50,1).otherwise(0)).show()

+-----+--------+-----+----+
| name|  scores|score|good|
+-----+--------+-----+----+
|Chris|[67, 42]|   67|   1|
|Chris|[67, 42]|   42|   0|
|Logan|[70, 72]|   70|   1|
|Logan|[70, 72]|   72|   1|
+-----+--------+-----+----+



## Text analysis of Yelp reviews, warm-up

In [67]:
%time df = spark.read.parquet('/scratch/si618w22_class_root/si618w22_class/shared_data/yelp_academic_dataset_review.parquet')

CPU times: user 1.73 ms, sys: 837 µs, total: 2.57 ms
Wall time: 224 ms


In [68]:
df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stars: double (nullable = true)



### Q2: How many reviews are there?

In [69]:
df.count()

6685900

### How many reviews for each value of star?


In [76]:
grouped = df.groupBy('stars')


In [77]:
grouped

<pyspark.sql.group.GroupedData at 0x2ab1eb16ee20>

In [71]:
counts = grouped.count()

In [78]:
s = counts.sort('stars')

In [79]:
s.show()

+-----+-------+
|stars|  count|
+-----+-------+
|  1.0|1002159|
|  2.0| 542394|
|  3.0| 739280|
|  4.0|1468985|
|  5.0|2933082|
+-----+-------+



In [80]:
# in one line
df_new = df.groupBy('stars').count().sort('stars').show()

+-----+-------+
|stars|  count|
+-----+-------+
|  1.0|1002159|
|  2.0| 542394|
|  3.0| 739280|
|  4.0|1468985|
|  5.0|2933082|
+-----+-------+



In [82]:
df_new.select('stars') # intentional error

# cause "show" is there

AttributeError: 'NoneType' object has no attribute 'select'

In [85]:
df_new = df.groupBy('stars').count().sort('stars')
df_new.select('stars').show() # works

+-----+
|stars|
+-----+
|  1.0|
|  2.0|
|  3.0|
|  4.0|
|  5.0|
+-----+



### How many positive (i.e. 4- or 5-star) reviews are there? 

In [86]:
from pyspark.sql.functions import col

In [88]:
df.filter(df['stars'] >= 4).count()

4402067

In [91]:
df.filter(col('stars') >= 4).count() # same as df['stars']

4402067

In [99]:
df.select('text').show(truncate=True)

+--------------------+
|                text|
+--------------------+
|Total bill for th...|
|Today was my seco...|
|This place has go...|
|Walked in around ...|
|I cannot believe ...|
|Unfortunately, I ...|
|if i can give thi...|
|This review is in...|
|I tried this plac...|
|Love this place d...|
|Came here on a Th...|
|They keep there a...|
|Met a friend for ...|
|We had dinner at ...|
|I am years out fr...|
|I've never experi...|
|((( LADIES BEWARE...|
|Th service here i...|
|I took my wife ou...|
|This company trie...|
+--------------------+
only showing top 20 rows



In [93]:
# NEXT

In [94]:
review = spark.read.parquet('/scratch/si618w22_class_root/si618w22_class/shared_data/yelp_academic_dataset_review.parquet')

In [95]:
business = spark.read.parquet('/scratch/si618w22_class_root/si618w22_class/shared_data/yelp_academic_dataset_business.parquet')

In [96]:
checkin = spark.read.parquet('/scratch/si618w22_class_root/si618w22_class/shared_data/yelp_academic_dataset_checkin.parquet')

In [97]:
tip = spark.read.parquet('/scratch/si618w22_class_root/si618w22_class/shared_data/yelp_academic_dataset_tip.parquet')

In [98]:
user = spark.read.parquet('/scratch/si618w22_class_root/si618w22_class/shared_data/yelp_academic_dataset_user.parquet')

### Display the schemas for each of the dataframes

In [None]:
for i in [review, business, checkin, tip, user]:
    i.printSchema()

In [None]:
# insert your code here

Now let's register a temporary virtual table:

In [None]:
review.createOrReplaceTempView('review')

In [None]:
query = """
SELECT count(*) AS the_count 
   FROM review 
"""


In [None]:
spark.sql(query).show()

# END OF NOTEBOOK

By the way, we can get pretty complex:

In [None]:
q = """SELECT FIRST(user.name) as name,FIRST(user.user_id) as id,
    SUM(CASE WHEN business.state = 'AZ' THEN 1 ELSE 0 END) AS az_count,
    FIRST(user.review_count) as total_count,
    ROUND((SUM(CASE WHEN business.state = 'AZ' THEN 1 ELSE 0 END)
        / FIRST(user.review_count) * 100), 6)
        as percent
FROM review
    JOIN business on review.business_id = business.business_id
    JOIN user on review.user_id = user.user_id
GROUP BY user.user_id
ORDER BY az_count DESC
LIMIT 5"""
spark.sql(q).show()