In [0]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
##API Documentation##
#https://openweathermap.org/api/one-call-3#data


In [0]:
kv = "AdkinsOpenWeatherAPI"
api_key = dbutils.secrets.get(scope = kv, key ="AdkinsOpenWeatherAPIKey")
print(api_key)


In [0]:
# List of cities for which we want to get the weather data
cities = [
    "Grand Rapids,MI,US", 
    "Charleston,SC,US", 
    "Chicago,IL,US", 
    "Johnson City,TN,US", 
    "New Orleans,LA,US", 
    "Asheville,SC,US"
]

In [0]:
# Define the schema for the Spark DataFrame
schema = StructType([
    StructField("city", StringType(), True),
    StructField("temperature", FloatType(), True),  # Temperature in Fahrenheit (imperial units)
    StructField("humidity", IntegerType(), True),
    StructField("weather", StringType(), True)
])

In [0]:
# Initialize an empty list to hold the data for each city
all_city_data = []

In [0]:
# Loop through each city and make the API request
for city in cities:
    # Construct the API URL with units set to 'imperial'
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&units=imperial&appid={api_key}"
    
    # Make the API call and extract the data
    response = requests.get(url)
    data = response.json()
    #print(data) #Debug!
    # Extract fields from the JSON response and add them to the list
    city_data = (
        data.get("name"),  # City name
        data["main"].get("temp"),  # Temperature in Fahrenheit
        data["main"].get("humidity"),  # Humidity
        data["weather"][0].get("description")  # Weather description
    )
    
    all_city_data.append(city_data)

# Create a Spark DataFrame using the collected data and schema
weather_df = spark.createDataFrame(all_city_data, schema)

# Show the Spark DataFrame with weather data for all cities
weather_df.show()

In [0]:
from pyspark.sql.functions import current_timestamp

weather_df = weather_df.withColumn("request_time", current_timestamp())
weather_df.write.format("delta").mode("append").saveAsTable("weather.conditions.historical_weather")

In [0]:
%sql
Select cast(request_time as date),* from weather.conditions.historical_weather order by request_time desc, city asc

Databricks visualization. Run in Databricks to view.

In [0]:
%sql
Select * from weather.conditions.weather_data order by city asc