In [0]:
%run "./00 - Setup"

In [0]:
%pip install folium


In [0]:
catalog = dbutils.widgets.get("catalog")
database = dbutils.widgets.get("database")
volume = dbutils.widgets.get("volume")

In [0]:
%sql
-- CHANGE THESE VARIABLES AS NEEDED
USE CATALOG ${catalog};
CREATE DATABASE IF NOT EXISTS ${database};
CREATE VOLUME IF NOT EXISTS ${database}.${volume};

In [0]:
print(f"{catalog}.{database}")

In [0]:
from pyspark.sql.functions import to_date, to_timestamp, col, expr, explode


## Ingest CSV

NUFORC geolocated and time standardized ufo reports for close to a century of data. 80,000 plus reports.


In [0]:
# Get the current notebook path
current_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()

# Display the current path
display(current_path)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import to_date, to_timestamp, col, expr, monotonically_increasing_id

# Define the schema for the CSV file
schema = StructType([
    StructField("datetime", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("country", StringType(), True),
    StructField("shape", StringType(), True),
    StructField("duration_seconds", DoubleType(), True),
    StructField("duration_hours_min", StringType(), True),
    StructField("comments", StringType(), True),
    StructField("date_posted", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True)
])

# Example path to the file in the workspace folder
file_path = f"file:/Workspace/{current_path}/../../data/ufo-scrubbed-geocoded-time-standardized.csv"

# Load the CSV file using Spark with the defined schema
df = spark.read.format("csv") \
    .option("header", "false") \
    .schema(schema) \
    .load(file_path)

# Filter out rows where latitude or longitude is null
df = df.filter(col("latitude").isNotNull() & col("longitude").isNotNull())

# Convert datetime and date_posted columns to date type
df = df.withColumn("geometry", expr("st_aswkt(st_point(longitude, latitude))"))

# Add an auto-increment ID column
df = df.withColumn("id", monotonically_increasing_id())

# Display the DataFrame
df.display()

In [0]:
df.write.mode("overwrite").saveAsTable(f"{catalog}.{database}.ufo_sightings")

In [0]:
%sql
SELECT shape, count(shape) FROM ${catalog}.${database}.ufo_sightings
GROUP BY shape;

Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT 
  CASE 
    WHEN HOUR(to_timestamp(datetime, "MM/dd/yyyy HH:mm")) BETWEEN 6 AND 18 THEN 'Daytime'
    ELSE 'Nighttime'
  END AS time_of_day, 
  COUNT(*) AS sighting_count
FROM ${catalog}.${database}.ufo_sightings
GROUP BY 
  CASE 
    WHEN HOUR(to_timestamp(datetime, "MM/dd/yyyy HH:mm")) BETWEEN 6 AND 18 THEN 'Daytime'
    ELSE 'Nighttime'
  END;

In [0]:
# Convert Spark DataFrame to Pandas DataFrame
pandas_df = spark.read.table(f"{catalog}.{database}.sightseeings").limit(100).toPandas()

# Import Folium and IPython display
import folium
#from IPython.display import display

# Create a map centered around the average latitude and longitude
map_center = [pandas_df['latitude'].mean(), pandas_df['longitude'].mean()]
m = folium.Map(location=map_center, zoom_start=5)

ufo_icon_url = 'https://cdn-icons-png.flaticon.com/512/3306/3306571.png'  # Replace with the actual URL to your UFO icon

# Add points to the map
for _, row in pandas_df.iterrows():
    folium.Marker(
        location=[row['latitude'], row['longitude']],
        popup=f"Shape: {row['shape']}, Time: {row['datetime']}, Comments: {row['comments']}",
        icon=folium.CustomIcon(ufo_icon_url, icon_size=(30, 30))  # Adjust icon_size as needed
    ).add_to(m)

m

## How to ingest ArcGIS data

    Options:
        - `url` (str): The ArcGIS FeatureServer endpoint without query parameters. **Required**.
        - `where` (str): Filter results based on the where clause. Defaults to 1=1.
        - `outFields` (str): Determine which field to includes in the response. Defaults to *.
        - `chunkSize` (int): Process records in chunks of `chunkSize` records at a time. Defaults to 1000 which is the maximum supported by ArcGIS server.

    Example Usage:
        df = (
            spark.read.format("arcgis")
            .option("url", url)
            .option("where", "1=1")
            .option("outFields", "*")
            .option("chunkSize", "1000")
            .load()
        )

## Ingest major cities
Population greater than 10K

In [0]:
cities_df = spark.read.format("arcgis") \
.option("url", "https://services.arcgis.com/P3ePLMYs2RVChkJx/arcgis/rest/services/USA_Major_Cities_/FeatureServer/0") \
.load() 

cities_df = cities_df.withColumn("geometry", expr("st_aswkt(ST_GeomFromGeoJSON(geometry))"))

cities_df.display()

In [0]:
from pyspark.sql.functions import col, explode, map_keys, lower

# Get all unique keys from the map column
keys_df = cities_df.select(explode(map_keys(col("properties")))).distinct()

# Collect keys into a list
keys = [row[0] for row in keys_df.collect()]

# Create columns for each key
for key in keys:
    cities_df = cities_df.withColumn(key.lower(), col("properties").getItem(key))
cities_df = cities_df.drop("properties")

cities_df.write.mode("overwrite").saveAsTable(f"{catalog}.{database}.major_cities")

In [0]:
%sql
ALTER TABLE ${catalog}.${database}.major_cities
ALTER COLUMN id SET NOT NULL;

ALTER TABLE ${catalog}.${database}.major_cities
ADD CONSTRAINT major_cities_pk PRIMARY KEY (id);

## Ingest airports

In [0]:
airport_df = spark.read.format("arcgis") \
.option("url", "https://services6.arcgis.com/ssFJjBXIUyZDrSYZ/arcgis/rest/services/US_Airport/FeatureServer/0") \
.load()

airport_df.display()

In [0]:
from pyspark.sql.functions import col, explode, map_keys, lower

# Get all unique keys from the map column
keys_df = airport_df.select(explode(map_keys(col("properties")))).distinct()

# Collect keys into a list
keys = [row[0] for row in keys_df.collect()]

# Create columns for each key
for key in keys:
    airport_df = airport_df.withColumn(key.lower(), col("properties").getItem(key))
airport_df = airport_df.drop("properties")

airport_df.write.mode("overwrite").saveAsTable(f"{catalog}.{database}.airports")

In [0]:
%sql
ALTER TABLE ${catalog}.${database}.airports
ALTER COLUMN id SET NOT NULL;

ALTER TABLE ${catalog}.${database}.airports
ADD CONSTRAINT airports_pk PRIMARY KEY (id);

## Ingest counties

In [0]:
counties_df = spark.read.format("arcgis") \
.option("url", "https://services5.arcgis.com/FlidZxdI0LGC9vAw/arcgis/rest/services/US_County_Reference_Data/FeatureServer/0") \
.load()

counties_df.display()

In [0]:
from pyspark.sql.functions import col, explode, map_keys, lower

# Get all unique keys from the map column
keys_df = counties_df.select(explode(map_keys(col("properties")))).distinct()

# Collect keys into a list
keys = [row[0] for row in keys_df.collect()]

# Create columns for each key
for key in keys:
    counties_df = counties_df.withColumn(key.lower(), col("properties").getItem(key))
counties_df = counties_df.withColumnRenamed("name", "county").drop("properties")

counties_df.write.mode("overwrite").saveAsTable(f"{catalog}.{database}.counties")

In [0]:
%sql
ALTER TABLE ${catalog}.${database}.counties
ALTER COLUMN id SET NOT NULL;

ALTER TABLE ${catalog}.${database}.counties
ADD CONSTRAINT counties_pk PRIMARY KEY (id);

In [0]:
import folium
#from IPython.display import display

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = counties_df.select("geometry").toPandas()

# Create a map centered around the average latitude and longitude of the geometries
map_center = [37.0902, -95.7129]
m = folium.Map(location=map_center, zoom_start=5)

# Add geometries to the map
for _, row in pandas_df.iterrows():
    folium.GeoJson(row['geometry']).add_to(m)

# Display the map
m