# Module 13 Assignment

## _*Make sure to read the instruction document before beginning the assignment_

In [1]:
# Import libraries

import requests
import json
import pandas as pd
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, count, avg, sum
from datetime import datetime
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

builder = SparkSession.builder.appName("FoursquareDataEnhanced") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.python.worker.timeout", "120") \
    .config("spark.driver.host", "localhost")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Initialize Spark session
# spark = SparkSession.builder \
#     .appName("FoursquareDataEnhanced") \
#     .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
#     .config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow") \
#     .getOrCreate()
# .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
# spark = SparkSession.builder \
#     .config("spark.driver.host", "localhost") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .config("spark.python.worker.timeout", "120") \
#     .appName("FoursquareDataEnhanced") \
#     .getOrCreate()

### Question #1 - Initialize Foursquare API (0 points)

In [2]:
# Foursquare API details
api_key = "fsq3tk1Rml2S++Vh6zBYWbYtuigjgDSGxT84gOSBfU4dWyQ="
api_url = "https://api.foursquare.com/v3/places/search"
headers = {
    "Accept": "application/json",
    "Authorization": api_key  #TODO Replace with your Foursquare API Key
}


### Question #2 - Retrieve 1000 Rows of Data (Restaurants) from Foursquare Places in Baltimore (20 points)

In [3]:
# Define search parameters
# Search for restaurants in Baltimore, Maryland
# This will require investigating the data to discover how the fields appear.

# The API is limited to 20 results per request
# If you have limit=20, that means you're getting results 1-20
# Give it offset=21 and you'll get 21-40.

# Parameters for the API request
params = {
    'query': "restaurant", #TODO
    'near': 'Baltimore, MD', #TODO
    'offset': 0,  # Offset the list of returned businesses by this amount.
    'limit': 20   # Set the number of results to retrieve per request (50 is the max)
}

# Make initial API request
response = requests.get(api_url, headers=headers, params=params)
print(response.text)

{"results":[{"fsq_id":"4ad4c014f964a52046ef20e3","categories":[{"id":13236,"name":"Italian Restaurant","short_name":"Italian","plural_name":"Italian Restaurants","icon":{"prefix":"https://ss3.4sqi.net/img/categories_v2/food/italian_","suffix":".png"}}],"chains":[],"closed_bucket":"LikelyOpen","distance":515,"geocodes":{"drop_off":{"latitude":39.294326,"longitude":-76.615485},"main":{"latitude":39.294336,"longitude":-76.61532},"roof":{"latitude":39.294336,"longitude":-76.61532}},"link":"/v3/places/4ad4c014f964a52046ef20e3","location":{"address":"405 N Charles St","address_extended":"Ste 1","census_block":"245100401003020","country":"US","dma":"Baltimore","formatted_address":"405 N Charles St, Baltimore, MD 21201","locality":"Baltimore","postcode":"21201","region":"MD"},"name":"Sotto Sopra","related_places":{},"timezone":"America/New_York"},{"fsq_id":"4aff5b30f964a520ac3722e3","categories":[{"id":13079,"name":"Brazilian Restaurant","short_name":"Brazilian","plural_name":"Brazilian Restau

In [4]:
# Create a loop to retrieve the restaurant data by iteratively updating the 'offset' parameter
# Set the maximum number of results to retrieve to 1000
places_data = []
for offset in range(0, 1000, 20):  # Fetch up to 100 results in batches of 20
    params['offset'] = offset
    response = requests.get(api_url, headers=headers, params=params)
    data = response.json()
    for place in data["results"]:  # Adjust "results" based on response
        places_data.append({
            "fsq_id": place.get("fsq_id"),
            "name": place.get("name"),
            "latitude": place["geocodes"]["main"]["latitude"],
            "longitude": place["geocodes"]["main"]["longitude"],
            "distance": place.get("distance"),
            "category_id": place["categories"][0]["id"] if place.get("categories") else None,
            "category_name": place["categories"][0]["name"] if place.get("categories") else None,
            "region": place["location"].get("region"),
            "locality": place["location"].get("locality"),
            "postcode": place["location"].get("postcode"),
            "address": place["location"].get("formatted_address"),
            "timezone": place.get("timezone"),
        })
places_data

[{'fsq_id': '4ad4c014f964a52046ef20e3',
  'name': 'Sotto Sopra',
  'latitude': 39.294336,
  'longitude': -76.61532,
  'distance': 515,
  'category_id': 13236,
  'category_name': 'Italian Restaurant',
  'region': 'MD',
  'locality': 'Baltimore',
  'postcode': '21201',
  'address': '405 N Charles St, Baltimore, MD 21201',
  'timezone': 'America/New_York'},
 {'fsq_id': '4aff5b30f964a520ac3722e3',
  'name': 'Fogo De Chao',
  'latitude': 39.287094,
  'longitude': -76.607334,
  'distance': 556,
  'category_id': 13079,
  'category_name': 'Brazilian Restaurant',
  'region': 'MD',
  'locality': 'Baltimore',
  'postcode': '21202',
  'address': '600 E Pratt St (at Market St), Baltimore, MD 21202',
  'timezone': 'America/New_York'},
 {'fsq_id': '4afdbf3af964a5207e2a22e3',
  'name': 'Trinacria Macaroni Works',
  'latitude': 39.294222,
  'longitude': -76.622827,
  'distance': 998,
  'category_id': 13039,
  'category_name': 'Deli',
  'region': 'MD',
  'locality': 'Baltimore',
  'postcode': '21201',
 

In [5]:
# Transform data into a list of dictionaries with relevant fields

# Fields to Retrieve from Foursquare Places API (1,000 restaurants total):
#   fsq_id
#   name
#   latitude
#   longitude
#   distance
#   category_id
#   category_name
#   region
#   locality
#   postcode
#   address
#   timezone

places = places_data

### Question #3 - Convert to PySpark DataFrame (10 points)

In [6]:
# Convert Python dictionary to PySpark DataFrame
df_restuarants = pd.DataFrame(places_data)
df = spark.createDataFrame(df_restuarants)

In [7]:
# Add a column with today's date (YYYYMMDD) as 'load_dt'
df = df.withColumn("load_dt", lit(datetime.now().strftime("%Y%m%d")))

In [8]:
# Display() the first 5 records of the PySpark DataFrame
df.limit(5).show()

+--------------------+--------------------+---------+----------+--------+-----------+--------------------+------+---------+--------+--------------------+----------------+--------+
|              fsq_id|                name| latitude| longitude|distance|category_id|       category_name|region| locality|postcode|             address|        timezone| load_dt|
+--------------------+--------------------+---------+----------+--------+-----------+--------------------+------+---------+--------+--------------------+----------------+--------+
|4ad4c014f964a5204...|         Sotto Sopra|39.294336| -76.61532|     515|      13236|  Italian Restaurant|    MD|Baltimore|   21201|405 N Charles St,...|America/New_York|20241201|
|4aff5b30f964a520a...|        Fogo De Chao|39.287094|-76.607334|     556|      13079|Brazilian Restaurant|    MD|Baltimore|   21202|600 E Pratt St (a...|America/New_York|20241201|
|4afdbf3af964a5207...|Trinacria Macaron...|39.294222|-76.622827|     998|      13039|               

### Question #4 - Create Bronze Table (20 points)

In [9]:
# Create the database if it doesn't exist
spark.sql("CREATE DATABASE IF NOT EXISTS places_db")

# Switch to the places_db database
spark.sql("USE places_db")

DataFrame[]

In [10]:
# Write a Bronze table with the raw data to the places_db database with the 'delta' format (overwrite if it already exists)
# The table should be named <your_last_name>_bronze, i.e. 'mosko_bronze'
df.write.format("delta").mode("overwrite").saveAsTable(f"places_db.hatzenbeller_bronze")

Py4JJavaError: An error occurred while calling o76.saveAsTable.
: com.google.common.util.concurrent.ExecutionError: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2261)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
	at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:865)
	at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:875)
	at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:784)
	at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:746)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$createDeltaTable$10(DeltaCatalog.scala:186)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$createDeltaTable$1(DeltaCatalog.scala:184)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:66)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.org$apache$spark$sql$delta$catalog$DeltaCatalog$$createDeltaTable(DeltaCatalog.scala:102)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog$StagedDeltaTableV2.$anonfun$commitStagedChanges$1(DeltaCatalog.scala:601)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:66)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog$StagedDeltaTableV2.commitStagedChanges(DeltaCatalog.scala:587)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:585)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable(WriteToDataSourceV2Exec.scala:578)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable$(WriteToDataSourceV2Exec.scala:572)
	at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:186)
	at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:221)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:645)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:579)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at io.delta.storage.HadoopFileSystemLogStore.listFrom(HadoopFileSystemLogStore.java:59)
	at org.apache.spark.sql.delta.storage.LogStoreAdaptor.listFrom(LogStore.scala:452)
	at org.apache.spark.sql.delta.storage.DelegatingLogStore.listFrom(DelegatingLogStore.scala:127)
	at org.apache.spark.sql.delta.SnapshotManagement.listFrom(SnapshotManagement.scala:86)
	at org.apache.spark.sql.delta.SnapshotManagement.listFrom$(SnapshotManagement.scala:85)
	at org.apache.spark.sql.delta.DeltaLog.listFrom(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.listFromOrNone(SnapshotManagement.scala:103)
	at org.apache.spark.sql.delta.SnapshotManagement.listFromOrNone$(SnapshotManagement.scala:99)
	at org.apache.spark.sql.delta.DeltaLog.listFromOrNone(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.listFromFileSystemInternal(SnapshotManagement.scala:113)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$listDeltaCompactedDeltaAndCheckpointFiles$2(SnapshotManagement.scala:158)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$listDeltaCompactedDeltaAndCheckpointFiles$1(SnapshotManagement.scala:158)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
	at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
	at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115)
	at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.listDeltaCompactedDeltaAndCheckpointFiles(SnapshotManagement.scala:156)
	at org.apache.spark.sql.delta.SnapshotManagement.listDeltaCompactedDeltaAndCheckpointFiles$(SnapshotManagement.scala:151)
	at org.apache.spark.sql.delta.DeltaLog.listDeltaCompactedDeltaAndCheckpointFiles(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.createLogSegment(SnapshotManagement.scala:302)
	at org.apache.spark.sql.delta.SnapshotManagement.createLogSegment$(SnapshotManagement.scala:290)
	at org.apache.spark.sql.delta.DeltaLog.createLogSegment(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$2(SnapshotManagement.scala:578)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
	at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:573)
	at org.apache.spark.sql.delta.SnapshotManagement.withSnapshotLockInterruptibly(SnapshotManagement.scala:78)
	at org.apache.spark.sql.delta.SnapshotManagement.withSnapshotLockInterruptibly$(SnapshotManagement.scala:75)
	at org.apache.spark.sql.delta.DeltaLog.withSnapshotLockInterruptibly(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:573)
	at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:572)
	at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:69)
	at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:80)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:853)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:848)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
	at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:651)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
	at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:651)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115)
	at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:651)
	at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:847)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:866)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
	... 63 more


In [None]:
# Query all records of the bronze table you created above
bronze_table = spark.sql("SELECT * FROM places_db.hatzenbeller_bronze")

# display() the first 5 records
bronze_table.limit(5).show()

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `places_db`.`hatzenbeller_bronze` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [places_db, hatzenbeller_bronze], [], false


### Question #5 - Create Silver Table (20 points)

In [None]:
# Step 1: Select the relevant columns from the Bronze table
silver_df = spark.sql("""
    SELECT fsq_id, name, distance, category_name, region, locality, postcode, address 
    FROM places_db.hatzenbeller_bronze
""")

# Step 2: Remove duplicate rows from the Silver DataFrame
silver_df = silver_df.dropDuplicates()

# Show the cleaned Silver DataFrame
silver_df.show()


Py4JJavaError: An error occurred while calling o37.sql.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at io.delta.storage.HadoopFileSystemLogStore.listFrom(HadoopFileSystemLogStore.java:59)
	at org.apache.spark.sql.delta.storage.LogStoreAdaptor.listFrom(LogStore.scala:452)
	at org.apache.spark.sql.delta.storage.DelegatingLogStore.listFrom(DelegatingLogStore.scala:127)
	at org.apache.spark.sql.delta.SnapshotManagement.listFrom(SnapshotManagement.scala:86)
	at org.apache.spark.sql.delta.SnapshotManagement.listFrom$(SnapshotManagement.scala:85)
	at org.apache.spark.sql.delta.DeltaLog.listFrom(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.listFromOrNone(SnapshotManagement.scala:103)
	at org.apache.spark.sql.delta.SnapshotManagement.listFromOrNone$(SnapshotManagement.scala:99)
	at org.apache.spark.sql.delta.DeltaLog.listFromOrNone(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.listFromFileSystemInternal(SnapshotManagement.scala:113)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$listDeltaCompactedDeltaAndCheckpointFiles$2(SnapshotManagement.scala:158)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$listDeltaCompactedDeltaAndCheckpointFiles$1(SnapshotManagement.scala:158)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
	at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
	at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115)
	at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.listDeltaCompactedDeltaAndCheckpointFiles(SnapshotManagement.scala:156)
	at org.apache.spark.sql.delta.SnapshotManagement.listDeltaCompactedDeltaAndCheckpointFiles$(SnapshotManagement.scala:151)
	at org.apache.spark.sql.delta.DeltaLog.listDeltaCompactedDeltaAndCheckpointFiles(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.createLogSegment(SnapshotManagement.scala:302)
	at org.apache.spark.sql.delta.SnapshotManagement.createLogSegment$(SnapshotManagement.scala:290)
	at org.apache.spark.sql.delta.DeltaLog.createLogSegment(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.createLogSegment(SnapshotManagement.scala:314)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$updateInternal$1(SnapshotManagement.scala:984)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
	at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
	at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115)
	at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.updateInternal(SnapshotManagement.scala:981)
	at org.apache.spark.sql.delta.SnapshotManagement.updateInternal$(SnapshotManagement.scala:980)
	at org.apache.spark.sql.delta.DeltaLog.updateInternal(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$update$3(SnapshotManagement.scala:932)
	at org.apache.spark.sql.delta.SnapshotManagement.withSnapshotLockInterruptibly(SnapshotManagement.scala:78)
	at org.apache.spark.sql.delta.SnapshotManagement.withSnapshotLockInterruptibly$(SnapshotManagement.scala:75)
	at org.apache.spark.sql.delta.DeltaLog.withSnapshotLockInterruptibly(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$update$2(SnapshotManagement.scala:931)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
	at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.SnapshotManagement.update(SnapshotManagement.scala:931)
	at org.apache.spark.sql.delta.SnapshotManagement.update$(SnapshotManagement.scala:899)
	at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:74)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$initialSnapshot$4(DeltaTableV2.scala:161)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$initialSnapshot$1(DeltaTableV2.scala:159)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2$.withEnrichedUnsupportedTableException(DeltaTableV2.scala:381)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.initialSnapshot$lzycompute(DeltaTableV2.scala:158)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.initialSnapshot(DeltaTableV2.scala:138)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$tableSchema$2(DeltaTableV2.scala:182)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.tableSchema$lzycompute(DeltaTableV2.scala:182)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.tableSchema(DeltaTableV2.scala:180)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.schema(DeltaTableV2.scala:187)
	at org.apache.spark.sql.connector.catalog.Table.columns(Table.java:65)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.columns(DeltaTableV2.scala:58)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:197)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$createRelation$1(Analyzer.scala:1283)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.createRelation(Analyzer.scala:1248)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$resolveRelation$5(Analyzer.scala:1317)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$resolveRelation$1(Analyzer.scala:1311)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$resolveRelation(Analyzer.scala:1296)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1153)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1117)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
	at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1117)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1076)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:240)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:187)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:202)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:223)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:222)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)


In [None]:
# Write a Silver table to the places_db database with the 'delta' format (overwrite if it already exists)
# The table should be named <your_last_name>_silver, i.e. 'mosko_silver'
#TODO

In [None]:
# Query all records of the silver table you created above
silver_table = #TODO

# display() the first 5 records
#TODO

### Question #6 - Create Gold Table (20 points)

In [None]:
# Write a PySpark query to group the Silver DataFrame (silver_df) by three columns:
#   category_name: The name of the category (e.g., "Restaurant").
#   region: The state or region of the place.
#   locality: The city or locality of the place.
#
# Calculate the following aggregations for each group:
#   num_places: The total number of places in each group (count of fsq_id).
#   avg_distance: The average distance of the places from the query location.
#   unique_postcodes: The number of unique postcodes in each group.
#
# Store the aggregated results in a new DataFrame named gold_df

gold_df = #TODO

In [None]:
# Write a Gold table to the places_db database with the 'delta' format (overwrite if it already exists)
# The table should be named <your_last_name>_gold, i.e. 'mosko_gold'
#TODO

## Question #7 - Query the Gold Table (10 points)

In [None]:
# Query the gold table and store results in a PySpark DataFrame with an avg_distance of less than 3000
query = #TODO
gold_table = spark.sql(query)

# display() all records
#TODO

## Submitting the Assignment
Submit your Jupyter Notebook file with **all cells executed and outputs displayed**.

Export the notebook from Databricks as an html file (File > Export > HTML) and upload to Canvas: \<student-first-intial>\<student-last-name>\-module13.html
    
Upload a Jupyter Notebook with your code: \<student-first-intial>\<student-last-name>\-module13.ipyn