In [None]:
%pip install pyspark haversine numpy pandas

In [2]:
import pyspark

conf = pyspark.SparkConf()
conf = conf.setAppName("<my-app-name>")
# conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4040') ## to setup SPARK UI
# conf = conf.set('spark.jars', os.environ['GRAPHFRAMES_PATH']) ## graphframes in spark configuration

try:
    sc = pyspark.SparkContext(conf=conf)
except ValueError:
    # If a SparkContext is already running, get it
    sc = pyspark.SparkContext.getOrCreate()

 # streaming representation of this variable (jp notebook thingy)
spark = pyspark.SQLContext.getOrCreate(sc)

from pyspark.sql import functions as F
from pyspark.sql.window import Window

24/10/28 14:05:41 WARN Utils: Your hostname, Divyanshs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.208 instead (on interface en0)
24/10/28 14:05:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/28 14:05:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
bakery = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("data/Bakery.csv") 
# bakery.show()

In [4]:
filteredBakery = bakery \
    .filter(F.dayofweek("Date") == 2) \
    .withColumn("Hour-Period", F.hour("Time")) \
    .filter((F.col("Hour-Period") >= 7) & (F.col("Hour-Period") <= 11)) \
    .withColumn("weekday", F.lit("Monday"))

itemCounts = filteredBakery \
    .groupBy("Date", "Hour-Period", "Item", "weekday") \
    .agg(F.count("Item").alias("qty"))

windowSpec = Window.partitionBy("Date", "Hour-Period").orderBy(F.desc("qty"))

topItems = itemCounts \
    .withColumn("rank", F.row_number().over(windowSpec)) \
    .filter(F.col("rank") == 1) \
    .orderBy("Date", "Hour-Period")

topItems.select("Item", "qty", "weekday", "Date", "Hour-Period").show(truncate=False)

[Stage 2:>                                                          (0 + 1) / 1]

+---------+---+-------+----------+-----------+
|Item     |qty|weekday|Date      |Hour-Period|
+---------+---+-------+----------+-----------+
|Coffee   |2  |Monday |2016-10-31|8          |
|Coffee   |11 |Monday |2016-10-31|9          |
|Coffee   |10 |Monday |2016-10-31|10         |
|Coffee   |13 |Monday |2016-10-31|11         |
|Pastry   |1  |Monday |2016-11-07|8          |
|Pastry   |3  |Monday |2016-11-07|9          |
|Coffee   |7  |Monday |2016-11-07|10         |
|Coffee   |10 |Monday |2016-11-07|11         |
|Coffee   |1  |Monday |2016-11-14|7          |
|Medialuna|2  |Monday |2016-11-14|8          |
|Coffee   |5  |Monday |2016-11-14|9          |
|Coffee   |5  |Monday |2016-11-14|10         |
|Bread    |5  |Monday |2016-11-14|11         |
|Coffee   |1  |Monday |2016-11-21|7          |
|Coffee   |2  |Monday |2016-11-21|8          |
|Coffee   |8  |Monday |2016-11-21|9          |
|Coffee   |4  |Monday |2016-11-21|10         |
|Coffee   |4  |Monday |2016-11-21|11         |
|Coffee   |1 

                                                                                

In [5]:
transformedBakery = bakery \
    .withColumn("DayPart", 
        F.when((F.hour("Time") >= 6) & (F.hour("Time") < 11), "Breakfast")
         .when((F.hour("Time") >= 11) & (F.hour("Time") < 16), "Lunch")
         .otherwise("Dinner")
    )

transformedBakery = transformedBakery \
    .withColumn("DayType", 
        F.when((F.dayofweek("Date") == 7) | (F.dayofweek("Date") == 1), "Weekend")
         .otherwise("Weekday")
    )

transformedBakery = transformedBakery \
    .groupBy("DayType", "DayPart", "Item") \
    .agg(F.count("Item").alias("qty"))

windowSpec = Window.partitionBy("DayType", "DayPart").orderBy(F.desc("qty"))

rankedBakery = transformedBakery.withColumn("Rank", F.row_number().over(windowSpec))

top2Bakery = rankedBakery.filter(F.col("Rank") <= 2)

finalResult = top2Bakery \
    .groupBy("DayType", "DayPart") \
    .agg(F.collect_list("Item").alias("Top2Items"))

finalResult.select("DayType", "DayPart", "Top2Items").show(truncate=False)


+-------+---------+---------------+
|DayType|DayPart  |Top2Items      |
+-------+---------+---------------+
|Weekday|Breakfast|[Coffee, Bread]|
|Weekday|Dinner   |[Coffee, Bread]|
|Weekday|Lunch    |[Coffee, Bread]|
|Weekend|Breakfast|[Coffee, Bread]|
|Weekend|Dinner   |[Coffee, Bread]|
|Weekend|Lunch    |[Coffee, Bread]|
+-------+---------+---------------+



In [6]:
restaurants = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .json("data/Restaurants_in_Durham_County_NC.json") 
# restaurants.printSchema()

In [7]:
count_by_rpt = restaurants.groupBy("fields.rpt_area_desc").count()
count_by_rpt.show(truncate=False)

+---------------------+-----+
|rpt_area_desc        |count|
+---------------------+-----+
|Bed&Breakfast Home   |4    |
|Summer Camps         |4    |
|Institutions         |30   |
|Local Confinement    |2    |
|Mobile Food          |147  |
|School Buildings     |89   |
|Summer Food          |242  |
|Swimming Pools       |420  |
|Day Care             |173  |
|Tattoo Establishments|36   |
|Residential Care     |154  |
|Bed&Breakfast Inn    |2    |
|Adult Day Care       |5    |
|Lodging              |62   |
|Food Service         |1093 |
+---------------------+-----+



In [8]:
population = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .option("nullValue", "NA") \
  .option("nullValue", "--") \
  .csv("data/populationbycountry19802010millions.csv") 

population = population.withColumnRenamed("_c0", "Region")\
    .replace("NA", None) \
    .replace("--", None)

selected_columns = ["Region", "1990", "2000"]

sample_population = population.select(*selected_columns)\
    .filter(population.Region != 'World')\
    .na.drop(subset=["1990", "2000"])

for column in ["1990", "2000"]:
    sample_population = sample_population.withColumn(
        column, 
        F.when(F.col(column).cast("double").isNotNull(), F.col(column).cast("double")).otherwise(0.0)
    )

percentage_change = sample_population.withColumn(
    "PercentageChange",
    F.when(F.col("1990") == 0, 0)  # If 1990 population is 0, return 0
     .otherwise(((F.col("2000") - F.col("1990")) / F.col("1990")) * 100)  # Else, calculate the percentage change
)

max_increase = percentage_change.orderBy(F.desc("PercentageChange")).limit(1)
max_decrease = percentage_change.orderBy(F.asc("PercentageChange")).limit(1)

biggest_delta = max_increase.union(max_decrease)

biggest_delta.select("Region", "PercentageChange").show()

+--------------------+------------------+
|              Region|  PercentageChange|
+--------------------+------------------+
|United Arab Emirates| 76.27926078028749|
|          Montserrat|-63.18732525629077|
+--------------------+------------------+



24/10/28 14:05:53 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: --, 1990, 2000
 Schema: _c0, 1990, 2000
Expected: _c0 but found: --
CSV file: file:///Users/divyansh/Programming/bid-data/hw2/data/populationbycountry19802010millions.csv
24/10/28 14:05:53 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: --, 1990, 2000
 Schema: _c0, 1990, 2000
Expected: _c0 but found: --
CSV file: file:///Users/divyansh/Programming/bid-data/hw2/data/populationbycountry19802010millions.csv


In [9]:
text_df = spark.read.text("data/hw1text")

processed_df = text_df.select(
F.regexp_replace(
        F.regexp_replace(F.lower(F.col("value")), "[^0-9a-z]", " "),
        "\\s+", " "
    ).alias("cleaned_text")
)

In [10]:
words_df = processed_df.select(
    F.explode(F.split(F.col("cleaned_text"), " ")).alias("word")
)

word_count_df = words_df.groupBy("word").count()

word_count_df.orderBy("count", ascending=False).show()



+----+------+
|word| count|
+----+------+
| the|163547|
|  to| 89046|
|   p| 78664|
|  of| 75568|
| and| 72730|
|  in| 56782|
|   a| 53198|
| for| 29770|
|that| 28852|
|  is| 27601|
|  on| 24485|
|   s| 23615|
|with| 19575|
| are| 19417|
|  it| 18231|
|  be| 17998|
|  as| 17796|
|have| 16188|
|  at| 15965|
|  we| 15754|
+----+------+
only showing top 20 rows



                                                                                

In [11]:
from pyspark.ml.feature import NGram, Tokenizer

tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
tokenized_df = tokenizer.transform(processed_df)

ngram = NGram(n=2, inputCol="words", outputCol="bigrams")
ngram_df = ngram.transform(tokenized_df)

bigrams_df = ngram_df.select(F.explode(F.col("bigrams")).alias("bigram"))
bigram_count_df = bigrams_df.groupBy("bigram").count()

top_bigrams = bigram_count_df.orderBy(F.desc("count")).limit(10)

top_bigrams.show(truncate=False)



+--------+-----+
|bigram  |count|
+--------+-----+
|of the  |17484|
|in the  |12808|
|p the   |10363|
|covid 19|8762 |
|to the  |8372 |
|for the |5588 |
|n t     |5393 |
|on the  |5032 |
|to be   |4581 |
|will be |4177 |
+--------+-----+



                                                                                

In [12]:
foreclosure = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .json("data/durham-nc-foreclosure-2006-2016.json")

# foreclosure.printSchema()
# restaurants.printSchema()


In [13]:
from haversine import haversine, Unit
from pyspark.sql.types import DoubleType

target_location = (35.994914, -78.897133)

filtered_df = restaurants.filter(
    (F.col("fields.status") == "ACTIVE") &
    (F.col("fields.rpt_area_desc") == "Food Service")
)

def haversine_distance(lat1, lon1, lat2, lon2):
    point1 = (lat1, lon1)
    point2 = (lat2, lon2)
    return haversine(point1, point2, unit=Unit.MILES)

haversine_udf = F.udf(haversine_distance, DoubleType())

restaurants_with_coordinates = filtered_df.withColumn("latitude", F.col("geometry.coordinates")[1]) \
                                          .withColumn("longitude", F.col("geometry.coordinates")[0])

restaurants_with_coordinates = restaurants_with_coordinates.filter(
    (F.col("latitude").isNotNull()) &
    (F.col("longitude").isNotNull())
)

restaurants_with_distances = restaurants_with_coordinates.withColumn(
    "distance",
    haversine_udf(
        F.col("latitude"),
        F.col("longitude"),
        F.lit(target_location[0]),
        F.lit(target_location[1])
    )
)

closest_restaurant = restaurants_with_distances.orderBy("distance").limit(1)
import pandas as pd

pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('display.max_colwidth', None)
closest_restaurant.toPandas()

                                                                                

Unnamed: 0,datasetid,fields,geometry,record_timestamp,recordid,latitude,longitude,distance
0,restaurants-data,"(None, Full-Service Restaurant, [35.9932826, -78.8981331], None, 85098, 4, 2011-01-10, 310 E. MAIN ST., None, DURHAM, OLD HAVANA SANDWICH SHOP, (919) 667-9525, NC, 27701, 4, Food Service, 44, 3 - Municipal/Community, NO, ACTIVE, FOOD, 1 - Restaurant, 5 - Municipal/Community)","([-78.8981331, 35.9932826], Point)",2017-07-13T09:15:31-04:00,2ea5f1269a5c78304997d6fa69de63c7ab18e08e,35.993283,-78.898133,0.125822


In [14]:
foreclosures_df = foreclosure.withColumn("latitude", F.col("geometry.coordinates")[1])\
    .withColumn("longitude", F.col("geometry.coordinates")[0])


foreclosures_cleaned = foreclosures_df.filter(
    (F.col("latitude").isNotNull()) & 
    (F.col("longitude").isNotNull())
)

latitude, longitude = closest_restaurant.select("latitude", "longitude").first()

foreclosures_with_distance = foreclosures_cleaned.withColumn(
    "distance",
    haversine_udf(
        F.col("latitude"),
        F.col("longitude"),
        F.lit(latitude),
        F.lit(longitude)
    )
)

# Filter for foreclosures within 1 mile
foreclosures_within_one_mile = foreclosures_with_distance.filter(F.col("distance") <= 1)

# Show the results
foreclosures_within_one_mile.count()

320