In [0]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json, col
from pyspark.sql.types import *

In [0]:
api_key = "74255e561240d7b4e36b7cdd4c28741e"

def get_weather_data(city):
    base_url = "http://api.openweathermap.org/data/2.5/weather"
    params = {
        "q": city,
        "appid": api_key,
        "units": "metric"
    }
    response = requests.get(base_url, params=params)
    if response.status_code == 200:
        return response.text
    else:
        return None

udf_get_weather_data = udf(get_weather_data)

In [0]:
all_cities_df = spark.read.format("delta").load("/user/hive/warehouse/cities")

cities_df = all_cities_df.orderBy("id").limit(5)

display(cities_df)

id,lat,lon,name,country
1282616.0,27.98333,83.76667,Wāliṅ,NP
1282621.0,27.766666,84.566666,Upardang Gadhi,NP
1282635.0,28.130989,82.297256,Tulsīpur,NP
1282665.0,27.633333,84.5,Tikoli,NP
1282666.0,28.5,81.133331,Ṭikāpur,NP


In [0]:
weather_df = cities_df.withColumn("weather_data", udf_get_weather_data(cities_df.name))

weather_schema = StructType([
    StructField('coord', StructType([
        StructField('lon', FloatType(), True),
        StructField('lat', FloatType(), True)
    ])),
    StructField('weather', ArrayType(StructType([
        StructField('id', IntegerType(), True),
        StructField('main', StringType(), True),
        StructField('description', StringType(), True),
        StructField('icon', StringType(), True)
    ])), True),
    StructField('base', StringType(), True),
    StructField('main', StructType([
        StructField('temp', FloatType(), True),
        StructField('feels_like', FloatType(), True),
        StructField('temp_min', FloatType(), True),
        StructField('temp_max', FloatType(), True),
        StructField('pressure', IntegerType(), True),
        StructField('humidity', IntegerType(), True),
        StructField('sea_level', IntegerType(), True),
        StructField('grnd_level', IntegerType(), True)
    ])),
    StructField('visibility', IntegerType(), True),
    StructField('wind', StructType([
        StructField('speed', FloatType(), True),
        StructField('deg', FloatType(), True),
        StructField('gust', FloatType(), True)
    ])),
    StructField('clouds', StructType([
        StructField('all', IntegerType(), True)
    ])),
    StructField('dt', IntegerType(), True),
    StructField('sys', StructType([
        StructField('type', IntegerType(), True),
        StructField('id', IntegerType(), True),
        StructField('message', FloatType(), True),
        StructField('country', StringType(), True),
        StructField('sunrise', IntegerType(), True),
        StructField('sunset', IntegerType(), True)
    ])),
    StructField('timezone', IntegerType(), True),
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('cod', IntegerType(), True)
])

weather_df = weather_df.withColumn("weather_data", from_json(col("weather_data"), weather_schema))

weather_df = weather_df.withColumn("weather_temp", weather_df.weather_data["main"]["temp"])
weather_df = weather_df.withColumn("weather_description", weather_df.weather_data["weather"][0]["description"])

In [0]:
display(weather_df)

id,lat,lon,name,country,weather_data,weather_temp,weather_description
1282616.0,27.98333,83.76667,Wāliṅ,NP,"List(List(83.7667, 27.9833), List(List(800, Clear, clear sky, 01d)), stations, List(29.83, 30.51, 29.83, 29.83, 1001, 48, 1001, 918), 10000, List(1.32, 151.0, 1.17), List(0), 1686533468, List(null, null, null, NP, 1686526086, 1686576084), 20700, 1282616, Wāliṅ, 200)",29.83,clear sky
1282621.0,27.766666,84.566666,Upardang Gadhi,NP,"List(List(84.5667, 27.7667), List(List(800, Clear, clear sky, 01d)), stations, List(26.36, 26.36, 26.36, 26.36, 1001, 55, 1001, 887), 10000, List(0.13, 300.0, 0.44), List(4), 1686533468, List(null, null, null, NP, 1686525924, 1686575862), 20700, 1282621, Upardang Gadhi, 200)",26.36,clear sky
1282635.0,28.130989,82.297256,Tulsīpur,NP,"List(List(82.2973, 28.131), List(List(800, Clear, clear sky, 01d)), stations, List(31.75, 30.54, 31.75, 31.75, 1001, 30, 1001, 929), 10000, List(2.34, 149.0, 2.45), List(2), 1686533469, List(null, null, null, NP, 1686526419, 1686576457), 20700, 1282635, Tulsīpur, 200)",31.75,clear sky
1282665.0,27.633333,84.5,Tikoli,NP,"List(List(84.5, 27.6333), List(List(800, Clear, clear sky, 01d)), stations, List(31.16, 34.44, 31.16, 31.16, 1000, 57, 1000, 978), 10000, List(0.58, 74.0, 2.04), List(2), 1686533469, List(null, null, null, NP, 1686525958, 1686575860), 20700, 1282665, Tikoli, 200)",31.16,clear sky
1282666.0,28.5,81.133331,Ṭikāpur,NP,"List(List(81.1333, 28.5), List(List(800, Clear, clear sky, 01d)), stations, List(34.58, 33.54, 34.58, 34.58, 998, 27, 998, 980), 10000, List(0.75, 327.0, 1.08), List(1), 1686533469, List(null, null, null, NP, 1686526647, 1686576787), 20700, 1282666, Ṭikāpur, 200)",34.58,clear sky


In [0]:
# Flatten the weather_data column
weather_df = weather_df.withColumn("coord_lon", weather_df.weather_data["coord"]["lon"])
weather_df = weather_df.withColumn("coord_lat", weather_df.weather_data["coord"]["lat"])
weather_df = weather_df.withColumn("weather_id", weather_df.weather_data["weather"][0]["id"])
weather_df = weather_df.withColumn("weather_main", weather_df.weather_data["weather"][0]["main"])
weather_df = weather_df.withColumn("weather_description", weather_df.weather_data["weather"][0]["description"])
weather_df = weather_df.withColumn("weather_icon", weather_df.weather_data["weather"][0]["icon"])
weather_df = weather_df.withColumn("base", weather_df.weather_data["base"])
weather_df = weather_df.withColumn("temp", weather_df.weather_data["main"]["temp"])
weather_df = weather_df.withColumn("feels_like", weather_df.weather_data["main"]["feels_like"])
weather_df = weather_df.withColumn("temp_min", weather_df.weather_data["main"]["temp_min"])
weather_df = weather_df.withColumn("temp_max", weather_df.weather_data["main"]["temp_max"])
weather_df = weather_df.withColumn("pressure", weather_df.weather_data["main"]["pressure"])
weather_df = weather_df.withColumn("humidity", weather_df.weather_data["main"]["humidity"])
weather_df = weather_df.withColumn("sea_level", weather_df.weather_data["main"]["sea_level"])
weather_df = weather_df.withColumn("grnd_level", weather_df.weather_data["main"]["grnd_level"])
weather_df = weather_df.withColumn("visibility", weather_df.weather_data["visibility"])
weather_df = weather_df.withColumn("wind_speed", weather_df.weather_data["wind"]["speed"])
weather_df = weather_df.withColumn("wind_deg", weather_df.weather_data["wind"]["deg"])
weather_df = weather_df.withColumn("wind_gust", weather_df.weather_data["wind"]["gust"])
weather_df = weather_df.withColumn("clouds_all", weather_df.weather_data["clouds"]["all"])
weather_df = weather_df.withColumn("dt", weather_df.weather_data["dt"])
weather_df = weather_df.withColumn("sys_country", weather_df.weather_data["sys"]["country"])
weather_df = weather_df.withColumn("sunrise", weather_df.weather_data["sys"]["sunrise"])
weather_df = weather_df.withColumn("sunset", weather_df.weather_data["sys"]["sunset"])
weather_df = weather_df.withColumn("timezone", weather_df.weather_data["timezone"])
weather_df = weather_df.withColumn("cod", weather_df.weather_data["cod"])

# Drop the original weather_data column
weather_df = weather_df.drop("weather_data")

In [0]:
display(weather_df)

id,lat,lon,name,country,weather_temp,weather_description,coord_lon,coord_lat,weather_id,weather_main,weather_icon,base,temp,feels_like,temp_min,temp_max,pressure,humidity,sea_level,grnd_level,visibility,wind_speed,wind_deg,wind_gust,clouds_all,dt,sys_country,sunrise,sunset,timezone,cod
1282616.0,27.98333,83.76667,Wāliṅ,NP,29.83,clear sky,83.7667,27.9833,800,Clear,01d,stations,29.83,30.51,29.83,29.83,1001,48,1001,918,10000,1.32,151.0,1.17,0,1686533517,NP,1686526086,1686576084,20700,200
1282621.0,27.766666,84.566666,Upardang Gadhi,NP,26.36,clear sky,84.5667,27.7667,800,Clear,01d,stations,26.36,26.36,26.36,26.36,1001,55,1001,887,10000,0.13,300.0,0.44,4,1686533468,NP,1686525924,1686575862,20700,200
1282635.0,28.130989,82.297256,Tulsīpur,NP,31.75,clear sky,82.2973,28.131,800,Clear,01d,stations,31.75,30.54,31.75,31.75,1001,30,1001,929,10000,2.34,149.0,2.45,2,1686533469,NP,1686526419,1686576457,20700,200
1282665.0,27.633333,84.5,Tikoli,NP,31.16,clear sky,84.5,27.6333,800,Clear,01d,stations,31.16,34.44,31.16,31.16,1000,57,1000,978,10000,0.58,74.0,2.04,2,1686533517,NP,1686525958,1686575860,20700,200
1282666.0,28.5,81.133331,Ṭikāpur,NP,34.58,clear sky,81.1333,28.5,800,Clear,01d,stations,34.58,33.54,34.58,34.58,998,27,998,980,10000,0.75,327.0,1.08,1,1686533469,NP,1686526647,1686576787,20700,200


In [0]:
weather_df.write.mode("append").parquet("/tmp/weather_dim.parquet")