In [146]:
import requests
import json
from pyspark.sql.functions import col, current_timestamp
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, ArrayType, StructType, MapType



## Adding target container path for ingestion

In [147]:
# Constants
Ingestion_base_path = "abfss://raw@datademosynapse.dfs.core.windows.net/"


## Defining Base URL and Calling API key from keyvault

In [148]:
api_base_url = "https://api.openweathermap.org/data/2.5/weather"
api_key = mssparkutils.credentials.getSecret('glocalazure2023', 'weatherapikey','AzureKeyVault1')


## Resuable Base Function with parameter as city name

In [149]:
def fetch_weather_data(url, api_key, city):
    query_params = f"?q={city}&appid={api_key}"
    response = requests.get(f"{url}{query_params}")

    return response.json() if response.status_code == 200 else response

## List of cities

In [150]:
 #City list for weather data retrieval
city_list = ["Oslo", "Bergen", "Mumbai", "Jaipur", "Dubai"]


## Defining another function to call base function in parallel

In [151]:
# Retrieve weather data for each city in parallel
def fetch_weather_data_parallel(city_list):
    with ThreadPoolExecutor() as executor:
        futures = {executor.submit(fetch_weather_data, api_base_url, api_key, city): city for city in city_list}

        weather_data = []
        for future in as_completed(futures):
            weather_data.append(future.result())

        return weather_data

In [152]:
weather_data = fetch_weather_data_parallel(city_list)

## Reading weather data using RDD to handle JSON data

In [153]:
# Define the schema
weather_schema = StructType([
    StructField("base", StringType(), True),
    StructField("clouds", StructType([
        StructField("all", LongType(), True)
    ]), True),
    StructField("cod", LongType(), True),
    StructField("coord", StructType([
        StructField("lat", DoubleType(), True),
        StructField("lon", DoubleType(), True)
    ]), True),
    StructField("dt", LongType(), True),
    StructField("id", LongType(), True),
    StructField("main", StructType([
        StructField("feels_like", DoubleType(), True),
        StructField("grnd_level", LongType(), True),
        StructField("humidity", LongType(), True),
        StructField("pressure", LongType(), True),
        StructField("sea_level", LongType(), True),
        StructField("temp", DoubleType(), True),
        StructField("temp_max", DoubleType(), True),
        StructField("temp_min", DoubleType(), True)
    ]), True),
    StructField("name", StringType(), True),
    StructField("sys", StructType([
        StructField("country", StringType(), True),
        StructField("id", LongType(), True),
        StructField("sunrise", LongType(), True),
        StructField("sunset", LongType(), True),
        StructField("type", LongType(), True)
    ]), True),
    StructField("timezone", LongType(), True),
    StructField("visibility", LongType(), True),
    StructField("weather", ArrayType(StructType([
        StructField("description", StringType(), True),
        StructField("icon", StringType(), True),
        StructField("id", LongType(), True),
        StructField("main", StringType(), True)
    ]), True)),
    StructField("wind", StructType([
        StructField("deg", LongType(), True),
        StructField("gust", DoubleType(), True),
        StructField("speed", DoubleType(), True)
    ]), True)
])

In [154]:
 #Create the DataFrame
weather_rdd = sc.parallelize(weather_data)
#weather_df = spark.createDataFrame(weather_rdd, schema=weather_schema)
weather_df = spark.read.json(weather_rdd, schema=weather_schema)


In [155]:
weather_df.printSchema()

In [156]:
display(weather_df.limit(10))

## Flattening the Weather data

In [157]:

# Extract values from map columns
flattened_weather_df = (weather_df
    .withColumn("clouds_all", col("clouds.all"))
    .withColumn("coord_lon", col("coord.lon"))
    .withColumn("coord_lat", col("coord.lat"))
    .withColumn("main_temp", col("main.temp"))
    .withColumn("main_feels_like", col("main.feels_like"))
    .withColumn("main_temp_min", col("main.temp_min"))
    .withColumn("main_temp_max", col("main.temp_max"))
    .withColumn("main_pressure", col("main.pressure"))
    .withColumn("main_humidity", col("main.humidity"))
    
    .withColumn("sys_type", col("sys.type"))
    .withColumn("sys_id", col("sys.id"))
    .withColumn("sys_country", col("sys.country"))
    .withColumn("sys_sunrise", col("sys.sunrise"))
    .withColumn("sys_sunset", col("sys.sunset"))
    .withColumn("wind_speed", col("wind.speed"))
    .withColumn("wind_deg", col("wind.deg"))
    .withColumn("weather_main", col("weather")[0]["main"])
    .withColumn("weather_description", col("weather")[0]["description"])
    .withColumn("weather_icon", col("weather")[0]["icon"])
    .drop("clouds", "coord", "main", "rain", "sys", "wind", "weather"))




## Adding Loaded timestamp and convering temp. to celsius

In [158]:
# Convert temperature values to Celsius and add a timestamp
flattened_weather_df = (flattened_weather_df
    .withColumn("main_temp_celsius", col("main_temp") - 273.15)
    .withColumn("main_feels_like_celsius", col("main_feels_like") - 273.15)
    .withColumn("main_temp_min_celsius", col("main_temp_min") - 273.15)
    .withColumn("main_temp_max_celsius", col("main_temp_max") - 273.15)
    .withColumn("loaded_at", current_timestamp()))



In [159]:
display(flattened_weather_df.limit(10))


## Ingesting weather data to ADLS

In [160]:
# Save weather data to Azure Data Lake Storage
flattened_weather_df.write.mode("append").format("parquet").option("mergeSchema", "true").save(f"{Igestion_base_path}weather_data")