In [1]:
import os
import pyspark

conf = pyspark.SparkConf()
conf = conf.setAppName("rl5083-hw2")
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4040') ## to setup SPARK UI
conf = conf.set('spark.jars', os.environ['GRAPHFRAMES_PATH']) ## graphframes in spark configuration
sc = pyspark.SparkContext(conf=conf)
sc

24/10/22 23:40:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark = pyspark.SQLContext(sc)
spark



<pyspark.sql.context.SQLContext at 0x78811512ced0>

# Question 1 - 25 pts

Data: shared/data/Bakery.csv   

Show the highest selling item for Mondays, per hour, for the 7AM to 11AM hours. Note that "weekday", "period" have to be computed.

For example (these are made up numbers...)    
 Item qty, weekday, Date , Hour-period, qty    
 Bread, 102, Monday, 2016-10-31, 7AM   
 Coffee, 132, Monday, 2016-10-31, 8AM    
 :

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, hour, count, when, dayofweek
from pyspark.sql import functions as F
from pyspark.sql import Window

In [4]:
df = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("shared/data/Bakery.csv") 

df = df.withColumn("Hour", date_format(col("Time"), "hha"))
df = df.withColumn("weekday", 
    when(date_format(col("Date"), "E") == "Mon", "Monday")
    .when(date_format(col("Date"), "E") == "Tue", "Tuesday")
    .when(date_format(col("Date"), "E") == "Wed", "Wednesday")
    .when(date_format(col("Date"), "E") == "Thu", "Thursday")
    .when(date_format(col("Date"), "E") == "Fri", "Friday")
    .when(date_format(col("Date"), "E") == "Sat", "Saturday")
    .otherwise("Sunday")  # Assuming 7 represents Sunday
)
df = df.filter((col("weekday") == "Monday") & (col("Hour").between("07AM", "11AM")))

sales_per_hour = df.groupBy("Date", "Hour", "Item").agg(count("Transaction").alias("qty"))

highest_selling_items = sales_per_hour.withColumn("rank", F.row_number().over(Window.partitionBy("Date", "Hour").orderBy(col("qty").desc()))).filter(col("rank") == 1).drop("rank")
highest_selling_items = highest_selling_items.withColumn("weekday", F.date_format(col("Date"), "EEEE"))
highest_selling_items = highest_selling_items.withColumn("Hour", F.regexp_replace(col("Hour"), r"^0", ""))
highest_selling_items = highest_selling_items.select("Item", "qty", "weekday", "Date", F.col("Hour").alias("Hour-period"))

print("Highest Selling Items for Mondays (7AM to 11AM):")
highest_selling_items.show()

24/10/22 23:41:04 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Highest Selling Items for Mondays (7AM to 11AM):


                                                                                

+---------+---+-------+----------+-----------+
|     Item|qty|weekday|      Date|Hour-period|
+---------+---+-------+----------+-----------+
|   Coffee|  2| Monday|2016-10-31|        8AM|
|   Coffee| 11| Monday|2016-10-31|        9AM|
|   Coffee| 10| Monday|2016-10-31|       10AM|
|   Coffee| 13| Monday|2016-10-31|       11AM|
|   Coffee|  1| Monday|2016-11-07|        8AM|
|   Pastry|  3| Monday|2016-11-07|        9AM|
|   Coffee|  7| Monday|2016-11-07|       10AM|
|   Coffee| 10| Monday|2016-11-07|       11AM|
|   Coffee|  1| Monday|2016-11-14|        7AM|
|   Coffee|  2| Monday|2016-11-14|        8AM|
|   Coffee|  5| Monday|2016-11-14|        9AM|
|   Coffee|  5| Monday|2016-11-14|       10AM|
|    Bread|  5| Monday|2016-11-14|       11AM|
|   Coffee|  1| Monday|2016-11-21|        7AM|
|   Coffee|  2| Monday|2016-11-21|        8AM|
|   Coffee|  8| Monday|2016-11-21|        9AM|
|   Coffee|  4| Monday|2016-11-21|       10AM|
|   Coffee|  4| Monday|2016-11-21|       11AM|
|   Coffee|  

# Question 2 - 25 pts

Data: shared/data/Bakery.csv

Show the top 2 (by qty) items bought by Daypart, by DayType.

Note:    
Daypart = Breakfast if 6AM – 10:59AM, Lunch if 11:01AM – 3:59PM, Dinner   
otherwise   
DayType = Weekend if Sat, Sun, Weekday otherwise

For example (not necessarily the right numbers….)     
 Weekend, Breakfast, (coffee, Muffin)   
 Weekend, Lunch, (cookies, pastry)   

The Answer MUST include the 2 items in a single column

In [5]:
from pyspark.sql.functions import rank, concat_ws, collect_list

In [6]:
df2 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("shared/data/Bakery.csv") 

df2 = df2.withColumn("DayType",
                   when(dayofweek(col("Date")) == 7, "Weekend")  # Saturday
                   .when(dayofweek(col("Date")) == 1, "Weekend")  # Sunday
                   .otherwise("Weekday"))
df2 = df2.withColumn("Daypart",
                   when((col("Time").between("06:00:00", "10:59:59")), "Breakfast")
                   .when((col("Time").between("11:00:00", "15:59:59")), "Lunch")
                   .otherwise("Dinner"))

item_counts = df2.groupBy("DayType", "Daypart", "Item").agg(count("Transaction").alias("qty"))

top_items = item_counts.withColumn("rank", rank().over(Window.partitionBy("DayType", "Daypart").orderBy(col("qty").desc()))).filter(col("rank") <= 2)
top_items = top_items.groupBy("DayType", "Daypart").agg(concat_ws(", ", collect_list("Item")).alias("Top-2 Items"))

print("Top-2 (by qty) items bought by Daypart, by DayType:")
top_items.show()

Top-2 (by qty) items bought by Daypart, by DayType:


[Stage 10:>                                                         (0 + 1) / 1]

+-------+---------+-------------+
|DayType|  Daypart|  Top-2 Items|
+-------+---------+-------------+
|Weekday|Breakfast|Coffee, Bread|
|Weekday|   Dinner|Coffee, Bread|
|Weekday|    Lunch|Coffee, Bread|
|Weekend|Breakfast|Coffee, Bread|
|Weekend|   Dinner|Coffee, Bread|
|Weekend|    Lunch|Coffee, Bread|
+-------+---------+-------------+



                                                                                

# Question 3 - 20 pts

Data: shared/data/Restaurants_in_Durham_County_NC.json

Show the number of entities by "fields.rpt_area_desc"

Example (not true numbers):     
 "Food Service", 13   
 "Tatoo Establishment", 2   
 :

In [7]:
df3 = spark.read.json("shared/data//Restaurants_in_Durham_County_NC.json")

df3 = df3.groupBy("fields.rpt_area_desc").count()

print("Show the number of entities by \'fields.rpt_area_desc\':")
df3.show()

                                                                                

Show the number of entities by 'fields.rpt_area_desc':


[Stage 17:>                                                         (0 + 1) / 1]

+--------------------+-----+
|       rpt_area_desc|count|
+--------------------+-----+
|  Bed&Breakfast Home|    4|
|        Summer Camps|    4|
|        Institutions|   30|
|   Local Confinement|    2|
|         Mobile Food|  147|
|    School Buildings|   89|
|         Summer Food|  242|
|      Swimming Pools|  420|
|            Day Care|  173|
|Tattoo Establishm...|   36|
|    Residential Care|  154|
|   Bed&Breakfast Inn|    2|
|      Adult Day Care|    5|
|             Lodging|   62|
|        Food Service| 1093|
+--------------------+-----+



                                                                                

# Question 4 - 20 pts

Data: shared/data/populationbycountry19802010millions.csv

Show the country or region with the biggest percentage increase in population AND the country with biggest percentage decrease in population, between the years 1990 and 2000. Use only the countries, not 'World'.

Example (Not the real answer):    
North America, 2.30% <- assuming North America was max    
Aruba, -22.2%… <- assuming Aruba was min

In [8]:
from pyspark.sql.functions import concat, lit

In [9]:
df4 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("shared/data/populationbycountry19802010millions.csv") 

df4= df4.withColumnRenamed(df4.columns[0], "Country")
df4 = df4.filter(col("Country") != "World")
df4 = df4.withColumn("Change", ((col("2000") - col("1990")) / col("1990")) * 100)
df4 = df4.na.drop()
df4.createOrReplaceTempView("population_data")

max_increase_result = spark.sql("""
SELECT Country, Change 
FROM population_data
ORDER BY Change DESC 
LIMIT 1
""")
max_increase_result = max_increase_result.withColumn("Change", concat(col("Change"), lit("%")))

print("The biggest percentage increase in population between 1990 and 2000:")
max_increase_result.show()

24/10/22 23:41:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


The biggest percentage increase in population between 1990 and 2000:
+--------------------+------------------+
|             Country|            Change|
+--------------------+------------------+
|United Arab Emirates|76.27926078028749%|
+--------------------+------------------+



24/10/22 23:41:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010
 Schema: _c0, 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010
Expected: _c0 but found: 
CSV file: file:///home/jovyan/shared/data/populationbycountry19802010millions.csv


In [10]:
max_decrease_result = spark.sql("""
SELECT Country, Change
FROM population_data
ORDER BY Change
LIMIT 1
""")
max_decrease_result = max_decrease_result.withColumn("Change", concat(col("Change"), lit("%")))

print("The biggest percentage decrease in population between 1990 and 2000:")
max_decrease_result.show()

The biggest percentage decrease in population between 1990 and 2000:
+----------+-------------------+
|   Country|             Change|
+----------+-------------------+
|Montserrat|-63.18732525629077%|
+----------+-------------------+



24/10/22 23:41:41 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010
 Schema: _c0, 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010
Expected: _c0 but found: 
CSV file: file:///home/jovyan/shared/data/populationbycountry19802010millions.csv


# Question 5 - 20 pts

Data: hw1text (from HW1). 

Solve: do WordCount

Do word count exercise using pyspark. Ignore punctuation and normalize to lower case. i.e. replace characters in NOT in this set: [0-9a-z] with space.

HINT: You can use the sparkml package.

In [11]:
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.sql.functions import lower, regexp_replace, explode, split
import zipfile

In [12]:
# Here we put hw1text.zip and the Notebook under the same folder

zip_file_path = 'hw1text.zip'
extraction_dir = 'hw1text/'

os.makedirs(extraction_dir, exist_ok=True)

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extraction_dir)

extracted_files = os.listdir(extraction_dir)

text_files_path = [
    "20-01.txt",
    "20-02.txt",
    "20-03.txt",
    "20-04.txt",
    "20-05.txt"
]

text_files = [os.path.join(extraction_dir, file) for file in text_files_path]

In [13]:
df5 = spark.read.text(text_files)

df5 = df5.select(
    regexp_replace(lower(col("value")), "[^0-9a-z]+", " ").alias("cleaned_text")
)

tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
words_data = tokenizer.transform(df5)
word_counts = words_data.select(explode(col("words")).alias("word")).groupBy("word").count().orderBy("count", ascending=False)

print("Word Count:")
word_counts.show()

Word Count:




+----+------+
|word| count|
+----+------+
| the|163547|
|  to| 89046|
|   p| 78664|
|  of| 75568|
| and| 72730|
|  in| 56782|
|   a| 53198|
| for| 29770|
|that| 28852|
|  is| 27601|
|  on| 24485|
|   s| 23615|
|with| 19575|
| are| 19417|
|  it| 18231|
|  be| 17998|
|  as| 17796|
|have| 16188|
|  at| 15965|
|  we| 15754|
+----+------+
only showing top 20 rows



                                                                                

# Question 6 - 20 pts

Data: hw1text (from HW1)

Find the 10 most common bigrams.

HINT: You can use the sparkml package.

In [14]:
from pyspark.ml.feature import NGram

In [15]:
df6 = spark.read.text(text_files)

df6 = df6.select(
    regexp_replace(lower(col("value")), "[^0-9a-z]+", " ").alias("cleaned_text")
)

tokenizer6 = Tokenizer(inputCol="cleaned_text", outputCol="words")
words_data6 = tokenizer6.transform(df6)
ngram = NGram(n=2, inputCol="words", outputCol="bigrams")
bigram_data = ngram.transform(words_data6)
bigram_counts = bigram_data.select(explode(col("bigrams")).alias("bigram")).groupBy("bigram").count().orderBy(col("count").desc()).limit(10)

print("Top-10 most common bigrams:")
bigram_counts.show()

Top-10 most common bigrams:


[Stage 29:>                                                         (0 + 2) / 2]

+--------+-----+
|  bigram|count|
+--------+-----+
|  of the|17484|
|  in the|12808|
|   p the|10363|
|covid 19| 8762|
|  to the| 8372|
| for the| 5588|
|     n t| 5393|
|  on the| 5032|
|   to be| 4581|
| will be| 4177|
+--------+-----+



                                                                                

# Question 7 (Extra credit) - 40 pts

Data: durham-nc-foreclosure-2006-2016.json, Restaurants_in_Durham_County_NC.json
 
a) Find food service and active restaurants ("status" = "ACTIVE" and "rpt_area_desc" = "Food Service") closest to the following coordinate: of 35.994914, -78.897133, and show it.

b) With that restaurant in (a) as your center point, find the number of foreclosures within a 1 mile radius.

You can use an external library for calculating coordinate distances. The haversine library is available in Jupyterhub’s bigdata environment.

### a)

In [17]:
from haversine import haversine, Unit
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType

In [18]:
df7a = spark.read.json("shared/data/Restaurants_in_Durham_County_NC.json")
df7a.printSchema()

root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- closing_date: string (nullable = true)
 |    |-- est_group_desc: string (nullable = true)
 |    |-- geolocation: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- hours_of_operation: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- insp_freq: long (nullable = true)
 |    |-- opening_date: string (nullable = true)
 |    |-- premise_address1: string (nullable = true)
 |    |-- premise_address2: string (nullable = true)
 |    |-- premise_city: string (nullable = true)
 |    |-- premise_name: string (nullable = true)
 |    |-- premise_phone: string (nullable = true)
 |    |-- premise_state: string (nullable = true)
 |    |-- premise_zip: string (nullable = true)
 |    |-- risk: long (nullable = true)
 |    |-- rpt_area_desc: string (nullable = true)
 |    |-- seats: long (nullable = true)
 |    |-- sewage: string (nullable = true)
 |   

In [19]:
def calculate_distance(row):
    lat = row['fields']['geolocation'][0]
    lon = row['fields']['geolocation'][1]
    return (row['fields']['premise_name'], lat, lon, haversine(center_point, (lat, lon)))

In [20]:
schema = StructType([
    StructField("restaurant_name", StringType(), True),
    StructField("lat", FloatType(), True),
    StructField("lon", FloatType(), True),
    StructField("distance", FloatType(), True)
])

In [21]:
center_point = (35.994914, -78.897133)

df7a = df7a.filter(
    (F.col("fields.status") == "ACTIVE") &
    (F.col("fields.rpt_area_desc") == "Food Service") &
    (F.col("fields.geolocation").isNotNull())
)
df7a = df7a.rdd.map(calculate_distance)
df7a = spark.createDataFrame(df7a, schema)
df7a.printSchema()

root
 |-- restaurant_name: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- distance: float (nullable = true)



In [22]:
df7a = df7a.orderBy("distance").limit(1)

print("The restaurant which is closest to (35.994914, -78.897133):")
df7a.show()

The restaurant which is closest to (35.994914, -78.897133):


[Stage 31:>                                                         (0 + 1) / 1]

+--------------------+---------+---------+---------+
|     restaurant_name|      lat|      lon| distance|
+--------------------+---------+---------+---------+
|OLD HAVANA SANDWI...|35.993282|-78.89813|0.2024912|
+--------------------+---------+---------+---------+



                                                                                

### b)

In [23]:
df7b = spark.read.json("shared/data/durham-nc-foreclosure-2006-2016.json")
schema = df7b.schema
df7b.printSchema()

root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- geocode: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- parcel_number: string (nullable = true)
 |    |-- year: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- record_timestamp: string (nullable = true)
 |-- recordid: string (nullable = true)



In [24]:
coordinates = df7a.select("lat", "lon").first()
center_lat = coordinates["lat"]
center_lon = coordinates["lon"]

                                                                                

In [25]:
print(f"Center point: ({center_lat}, {center_lon})")

Center point: (35.993282318115234, -78.89813232421875)


In [26]:
def foreclosure_within_radius(row):
    if row['fields']['geocode']:
        lat = row['fields']['geocode'][0]
        lon = row['fields']['geocode'][1]
        distance = haversine((center_lat, center_lon), (lat, lon), unit=Unit.MILES)
        return distance <= 1

In [27]:
foreclosures_within_radius = df7b.rdd.filter(foreclosure_within_radius)
df7b = spark.createDataFrame(foreclosures_within_radius, schema)
print("The number of foreclosures within a 1 mile radius:", df7b.count())

The number of foreclosures within a 1 mile radius: 320
