### Question 1 ####

# Restart Kernel
To ensure a clean state, please restart the kernel before proceeding to the next steps.
- In Jupyter Notebook, go to the menu: Kernel -> Restart & Clear Output

or run below code. 

In [1]:
from IPython.display import display
from ipykernel.kernelapp import IPKernelApp

def restart_kernel():
    app = IPKernelApp.instance()
    app.kernel.do_shutdown(True)

restart_kernel()

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, date_format, hour, count, to_timestamp

# Reuse the existing Spark session
# spark session

import os
import pyspark

conf = pyspark.SparkConf()
# conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041')

# conf.set('spark.sql.repl.eagerEval.enabled', True)
conf.set('spark.driver.memory','4g')

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)
sc

# Load the data
file_path = "shared/hw2/Bakery.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the schema to understand the data structure
df.printSchema()

# Display the first few rows to inspect the data
df.show(5)

# Combine Date and Time into a single timestamp column
df = df.withColumn("timestamp", concat(col("Date"), lit(" "), date_format(col("Time"), 'HH:mm:ss')))
df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))

# Filter data between 11 AM and 1 PM
filtered_df = df.filter((hour(col("timestamp")) >= 11) & (hour(col("timestamp")) < 13))

# Extract date and group by item and day, then count the occurrences
result_df = filtered_df.withColumn("day", date_format(col("timestamp"), "yyyy-MM-dd")) \
    .groupBy("Item", "day") \
    .agg(count("Transaction").alias("qty"))

# Display the unsorted result
print("Unsorted Result:")
result_df.show()

# Sort by Item asc, day asc, and qty desc
sorted_result_df = result_df.orderBy(col("Item").asc(), col("day").asc(), col("qty").desc())

# Display the sorted result
print("Sorted Result:")
sorted_result_df.show()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/13 23:34:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/13 23:34:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/07/13 23:34:17 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

root
 |-- Date: date (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Transaction: integer (nullable = true)
 |-- Item: string (nullable = true)

+----------+-------------------+-----------+-------------+
|      Date|               Time|Transaction|         Item|
+----------+-------------------+-----------+-------------+
|2016-10-30|2024-07-13 09:58:11|          1|        Bread|
|2016-10-30|2024-07-13 10:05:34|          2| Scandinavian|
|2016-10-30|2024-07-13 10:05:34|          2| Scandinavian|
|2016-10-30|2024-07-13 10:07:57|          3|Hot chocolate|
|2016-10-30|2024-07-13 10:07:57|          3|          Jam|
+----------+-------------------+-----------+-------------+
only showing top 5 rows

Unsorted Result:


                                                                                

+-------------+----------+---+
|         Item|       day|qty|
+-------------+----------+---+
|         Cake|2016-11-06|  1|
|       Coffee|2016-11-12| 19|
|        Bread|2016-11-28|  5|
|         NONE|2016-12-31|  1|
|        Fudge|2017-01-30|  1|
|        Salad|2017-02-05|  1|
|      Cookies|2017-02-09|  5|
|Hot chocolate|2017-02-16|  5|
|Hot chocolate|2017-02-23|  1|
|         Eggs|2017-03-03|  1|
|       Muffin|2017-03-05|  3|
|     Sandwich|2017-03-16|  1|
|        Bread|2017-04-06|  2|
|Hot chocolate|2016-11-07|  1|
|Mineral water|2016-11-23|  1|
|     Art Tray|2016-12-10|  1|
|       Pastry|2016-12-12|  1|
|        Bread|2016-12-21|  5|
|      Brownie|2016-12-23|  2|
|        Fudge|2016-12-29|  1|
+-------------+----------+---+
only showing top 20 rows

Sorted Result:


[Stage 6:>                                                          (0 + 1) / 1]

+--------------------+----------+---+
|                Item|       day|qty|
+--------------------+----------+---+
|Afternoon with th...|2017-01-21|  2|
|Afternoon with th...|2017-01-22|  1|
|Afternoon with th...|2017-02-18|  1|
|           Alfajores|2016-11-02|  1|
|           Alfajores|2016-11-04|  1|
|           Alfajores|2016-11-08|  3|
|           Alfajores|2016-11-11|  3|
|           Alfajores|2016-11-12|  3|
|           Alfajores|2016-11-13|  1|
|           Alfajores|2016-11-17|  5|
|           Alfajores|2016-11-20|  4|
|           Alfajores|2016-11-25|  2|
|           Alfajores|2016-11-27|  2|
|           Alfajores|2016-12-03|  1|
|           Alfajores|2016-12-04|  2|
|           Alfajores|2016-12-05|  1|
|           Alfajores|2016-12-07|  1|
|           Alfajores|2016-12-09|  1|
|           Alfajores|2016-12-13|  1|
|           Alfajores|2016-12-14|  2|
+--------------------+----------+---+
only showing top 20 rows



                                                                                

### Question 2 ####

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, date_format, hour, count, to_timestamp, when, dayofweek, row_number, collect_list, array_join
from pyspark.sql.window import Window

# Reuse the existing Spark session
spark = SparkSession.builder.getOrCreate()

# Load the data
file_path = "shared/hw2/Bakery.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Combine Date and Time into a single timestamp column
df = df.withColumn("timestamp", concat(col("Date"), lit(" "), date_format(col("Time"), 'HH:mm:ss')))
df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))

# Define dayparts
df = df.withColumn("daypart", 
                   when((hour(col("timestamp")) >= 6) & (hour(col("timestamp")) < 12), "Morning")
                   .when((hour(col("timestamp")) >= 12) & (hour(col("timestamp")) < 18), "Afternoon")
                   .when((hour(col("timestamp")) >= 18) & (hour(col("timestamp")) < 24), "Evening")
                   .otherwise("Night"))

# Define day types
df = df.withColumn("daytype", 
                   when(dayofweek(col("timestamp")).isin([1, 7]), "Weekend")
                   .otherwise("Weekday"))

# Group by item, daypart, and daytype, then count the occurrences
grouped_df = df.groupBy("Item", "daypart", "daytype") \
               .agg(count("Transaction").alias("qty"))

# Create a window specification to rank items by qty within each daypart and daytype
window_spec = Window.partitionBy("daypart", "daytype").orderBy(col("qty").desc())

# Add a row number to each row in the window
ranked_df = grouped_df.withColumn("rank", row_number().over(window_spec))

# Filter to get the top 3 items for each daypart and daytype
top_3_df = ranked_df.filter(col("rank") <= 3)

# Collect the top 3 items into a single column
result_df = top_3_df.groupBy("daypart", "daytype") \
                    .agg(array_join(collect_list(col("Item")), ", ").alias("top_3_items"))

# Show the result
result_df.show(truncate=False)


[Stage 11:>                                                         (0 + 1) / 1]

+---------+-------+------------------------------------------+
|daypart  |daytype|top_3_items                               |
+---------+-------+------------------------------------------+
|Afternoon|Weekday|Coffee, Bread, Tea                        |
|Afternoon|Weekend|Coffee, Bread, Tea                        |
|Evening  |Weekday|Coffee, Bread, Fudge                      |
|Evening  |Weekend|Tshirt, Afternoon with the baker, Postcard|
|Morning  |Weekday|Coffee, Bread, Pastry                     |
|Morning  |Weekend|Coffee, Bread, Pastry                     |
|Night    |Weekend|Bread                                     |
+---------+-------+------------------------------------------+



                                                                                

### Question 3 ###

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

# Reuse the existing Spark session
spark = SparkSession.builder.getOrCreate()

# Load the JSON data
file_path = "shared/hw2/Restaurants_in_Durham_County_NC.json"
df = spark.read.json(file_path)

# Group by rpt_area_desc and count the occurrences
result_df = df.groupBy("fields.rpt_area_desc").agg(count("*").alias("total_entities"))

# Show the result
result_df.show(truncate=False)


[Stage 18:>                                                         (0 + 1) / 1]

+---------------------+--------------+
|rpt_area_desc        |total_entities|
+---------------------+--------------+
|Bed&Breakfast Home   |4             |
|Summer Camps         |4             |
|Institutions         |30            |
|Local Confinement    |2             |
|Mobile Food          |147           |
|School Buildings     |89            |
|Summer Food          |242           |
|Swimming Pools       |420           |
|Day Care             |173           |
|Tattoo Establishments|36            |
|Residential Care     |154           |
|Bed&Breakfast Inn    |2             |
|Adult Day Care       |5             |
|Lodging              |62            |
|Food Service         |1093          |
+---------------------+--------------+



                                                                                

### Question 4 ###

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

# Initialize the Spark session
spark = SparkSession.builder.appName("PopulationAnalysis").getOrCreate()

# Load the CSV data
file_path = "shared/hw2/populationbycountry19802010millions.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Rename the first column to 'Country' for better clarity
df = df.withColumnRenamed("_c0", "Country")

# Filter out 'World' and region entries
regions = ["World", "Africa", "Asia", "Europe", "Latin America and the Caribbean", "Oceania", "Northern America"]
filtered_df = df.filter(~col("Country").isin(regions))

# Calculate the percentage increase in population between 1990 and 2000
result_df = filtered_df.withColumn("percentage_increase", ((col("2000").cast("float") - col("1990").cast("float")) / col("1990").cast("float") * 100))

# Select relevant columns and sort by percentage increase in descending order
sorted_df = result_df.select("Country", "percentage_increase").orderBy(col("percentage_increase").desc())

# Show the top 5 countries with the biggest percentage increase
sorted_df.show(5, truncate=False)


+------------------------+-------------------+
|Country                 |percentage_increase|
+------------------------+-------------------+
|United Arab Emirates    |76.27926665641841  |
|Afghanistan             |63.73220223919031  |
|Western Sahara          |54.71315282296692  |
|Turks and Caicos Islands|52.128594047562025 |
|Cayman Islands          |45.82701410699342  |
+------------------------+-------------------+
only showing top 5 rows



### Question 5 ###



# Restart Kernel
To ensure a clean state, please restart the kernel before proceeding to the next steps.
- In Jupyter Notebook, go to the menu: Kernel -> Restart & Clear Output

or run below code. 


In [8]:
from IPython.display import display
from ipykernel.kernelapp import IPKernelApp

def restart_kernel():
    app = IPKernelApp.instance()
    app.kernel.do_shutdown(True)

restart_kernel()

In [1]:
from pyspark.sql import SparkSession
import re

# Initialize a Spark session
spark = SparkSession.builder.appName("WordCount").getOrCreate()

# Define file paths
file_paths = [
    "shared/hw2/hw1text/20-01.txt",
    "shared/hw2/hw1text/20-02.txt",
    "shared/hw2/hw1text/20-03.txt",
    "shared/hw2/hw1text/20-04.txt",
    "shared/hw2/hw1text/20-05.txt"
]

# Read files into an RDD
rdd = spark.sparkContext.textFile(','.join(file_paths))

# Function to clean and split text
def clean_text(line):
    # Normalize to lower case
    line = line.lower()
    # Replace characters not in the set [0-9a-z] with space
    line = re.sub(r'[^0-9a-z]', ' ', line)
    # Split the line into words
    return line.split()

# Apply the cleaning function and perform word count
word_counts = rdd.flatMap(clean_text) \
                 .map(lambda word: (word, 1)) \
                 .reduceByKey(lambda a, b: a + b)

# Convert to DataFrame for better visualization and usage
word_count_df = word_counts.toDF(["word", "count"])

# Show the DataFrame (or you can save it to a file)
word_count_df.show(truncate=False)

# Stop the Spark session
spark.stop()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/13 23:38:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/13 23:38:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+----------+-----+
|word      |count|
+----------+-----+
|government|4724 |
|of        |75568|
|was       |10795|
|year      |3760 |
|minister  |1550 |
|us        |3549 |
|to        |89046|
|5         |1778 |
|annually  |41   |
|does      |1360 |
|consume   |30   |
|alarmed   |11   |
|if        |5983 |
|nothing   |361  |
|is        |27601|
|annual    |356  |
|joynews   |2    |
|editor    |266  |
|george    |145  |
|said      |14909|
+----------+-----+
only showing top 20 rows



### Question 6  ###



# Restart Kernel
To ensure a clean state, please restart the kernel before proceeding to the next steps.
- In Jupyter Notebook, go to the menu: Kernel -> Restart & Clear Output

or run below code. 


In [2]:
from IPython.display import display
from ipykernel.kernelapp import IPKernelApp

def restart_kernel():
    app = IPKernelApp.instance()
    app.kernel.do_shutdown(True)

restart_kernel()

In [1]:
from pyspark.sql import SparkSession
import re
from operator import add

# Initialize a Spark session
spark = SparkSession.builder.appName("BigramCount").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Define file paths
file_paths = [
    "shared/hw2/hw1text/20-01.txt",
    "shared/hw2/hw1text/20-02.txt",
    "shared/hw2/hw1text/20-03.txt",
    "shared/hw2/hw1text/20-04.txt",
    "shared/hw2/hw1text/20-05.txt"
]

# Read files into an RDD
rdd = spark.sparkContext.textFile(','.join(file_paths))

# Function to clean and split text into bigrams
def clean_and_split_to_bigrams(line):
    # Normalize to lower case
    line = line.lower()
    # Replace characters not in the set [0-9a-z] with space
    line = re.sub(r'[^0-9a-z\s]', ' ', line)
    # Split the line into words
    words = line.split()
    # Remove single-character words except meaningful ones like 'i', 'a'
    words = [word for word in words if len(word) > 1 or word in ('i', 'a')]
    # Generate bigrams
    bigrams = [(words[i], words[i+1]) for i in range(len(words) - 1)]
    return bigrams

# Apply the cleaning and bigram splitting function
bigrams_rdd = rdd.flatMap(clean_and_split_to_bigrams)

# Map bigrams to key-value pairs and reduce by key to count occurrences
bigram_counts = bigrams_rdd.map(lambda bigram: (bigram, 1)).reduceByKey(add)

# Convert to DataFrame for better visualization and usage
bigram_count_df = bigram_counts.toDF(["bigram", "count"])

# Get the 6 most common bigrams
most_common_bigrams = bigram_count_df.orderBy("count", ascending=False).limit(6)

# Show the most common bigrams
most_common_bigrams.show(truncate=False)

# Stop the Spark session
spark.stop()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/13 23:39:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/13 23:39:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+-----------+-----+
|bigram     |count|
+-----------+-----+
|{of, the}  |17498|
|{in, the}  |12819|
|{covid, 19}|8762 |
|{to, the}  |8397 |
|{for, the} |5592 |
|{on, the}  |5043 |
+-----------+-----+



### Question 7 ####

In [2]:
import json
from haversine import haversine, Unit

# Load restaurant data
with open('shared/hw2/Restaurants_in_Durham_County_NC.json', 'r') as file:
    restaurant_data = json.load(file)

# Filter active food service restaurants
active_food_service_restaurants = [
    restaurant for restaurant in restaurant_data
    if restaurant['fields'].get('status') == 'ACTIVE' and restaurant['fields'].get('rpt_area_desc') == 'Food Service'
]

# Reference coordinates
reference_coords = (35.994914, -78.897133)

# Find the closest restaurant
closest_restaurant = None
min_distance = float('inf')

for restaurant in active_food_service_restaurants:
    if 'geolocation' in restaurant['fields']:
        coords = restaurant['fields']['geolocation']
        restaurant_coords = (coords[0], coords[1])
        distance = haversine(reference_coords, restaurant_coords, unit=Unit.MILES)
        if distance < min_distance:
            min_distance = distance
            closest_restaurant = restaurant

if closest_restaurant:
    # Display closest restaurant
    restaurant_name = closest_restaurant['fields']['premise_name']
    restaurant_address = closest_restaurant['fields']['premise_address1']
    restaurant_city = closest_restaurant['fields']['premise_city']
    restaurant_zip = closest_restaurant['fields']['premise_zip']
    print("Closest Restaurant:")
    print(f"Name: {restaurant_name}")
    print(f"Address: {restaurant_address}, {restaurant_city}, NC {restaurant_zip}")

    # Load foreclosure data
    with open('shared/hw2/durham-nc-foreclosure-2006-2016.json', 'r') as file:
        foreclosure_data = json.load(file)

    # Extract foreclosure coordinates
    foreclosure_coords = [
        (foreclosure['fields']['geocode'][1], foreclosure['fields']['geocode'][0])
        for foreclosure in foreclosure_data if 'geocode' in foreclosure['fields']
    ]

    # Get the coordinates of the closest restaurant
    restaurant_coords = (closest_restaurant['fields']['geolocation'][0], closest_restaurant['fields']['geolocation'][1])

    # Count foreclosures within 1 mile of the closest restaurant
    foreclosure_count = sum(
        1 for coords in foreclosure_coords
        if haversine(coords, restaurant_coords, unit=Unit.MILES) <= 1
    )

    print(f"Number of foreclosures within 1 mile of {restaurant_name}: {foreclosure_count}")
else:
    print("No active food service restaurants found.")


Closest Restaurant:
Name: OLD HAVANA SANDWICH SHOP
Address: 310 E. MAIN ST., DURHAM, NC 27701
Number of foreclosures within 1 mile of OLD HAVANA SANDWICH SHOP: 0
