In [None]:
import requests
import uuid
from datetime import datetime
from pyspark.sql.functions import col, lit, to_timestamp,udf, explode,from_unixtime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, ArrayType
from delta.tables import *

#### Logger Notebook

In [None]:
%run "/Users/075bei015.kshitiz@pcampus.edu.np/DB6-Log-table"

In [None]:
json_schema = StructType([
    StructField('visibility', IntegerType(), True),
    StructField('timezone', IntegerType(), True),
    StructField('main', StructType([
        StructField('temp', FloatType(), True),
        StructField('feels_like', FloatType(), True),
        StructField('temp_min', FloatType(), True),
        StructField('temp_max', FloatType(), True),
        StructField('pressure', IntegerType(), True),
        StructField('humidity', IntegerType(), True),
        StructField('sea_level', IntegerType(), True),
        StructField('grnd_level', IntegerType(), True)
    ])),
    StructField('clouds', StructType([
        StructField('all', IntegerType(), True)
    ])),
    StructField('sys', StructType([
        StructField('country', StringType(), True),
        StructField('sunrise', IntegerType(), True),
        StructField('sunset', IntegerType(), True)
    ])),
    StructField('dt', IntegerType(), True),
    StructField('coord', StructType([
        StructField('lon', FloatType(), True),
        StructField('lat', FloatType(), True)
    ])),
    StructField('name', StringType(), True),
    StructField('weather', ArrayType(StructType([
                StructField('id', IntegerType(), True),
                StructField('main', StringType(), True),
                StructField('description', StringType(), True),
                StructField('icon', StringType(), True)
    ]), True)),
    StructField('cod', IntegerType(), True),
    StructField('id', IntegerType(), True),
    StructField('wind', StructType([
        StructField('speed', FloatType(), True),
        StructField('deg', IntegerType(), True),
        StructField('gust', FloatType(), True)
    ])),
    StructField('base', StringType(), True)
])

In [None]:
API_key="{your_api_key}"
@keep_log
def get_weather(df):
    
    def fetch_weather_data(id):
        url = f'https://api.openweathermap.org/data/2.5/weather?id={id}&appid={API_key}'
        result = requests.get(url)
        return result.json()
    fetch_weather_udf = udf(lambda x: fetch_weather_data(x), json_schema)
    
    df = df.withColumn('result', fetch_weather_udf(col('id'))).select('result')
        
    start = datetime.fromtimestamp(df.selectExpr("min(result.dt)").first()[0])
    end = datetime.fromtimestamp(df.selectExpr("max(result.dt)").first()[0])
    
    return df, start, end

In [None]:
# df_raw = get_weather('RAW', 'raw_weather', df)

In [None]:
# display(df_raw)

result,load_run_id,created_on,created_by
"List(10000, 20700, List(312.44, 310.13, 312.44, 312.44, 1000, 14, 1000, 919), List(12), List(NP, 1686180491, 1686230388), 1686210994, List(83.7667, 27.9833), Wāliṅ, List(List(801, Clouds, few clouds, 02d)), 200, 1282616, List(3.24, 155, 2.75), stations)",664bb0a8-7dca-40ab-a222-4ef33103648e,2023-06-08T08:01:57.493+0000,Kshitiz Dhakal
"List(10000, 20700, List(311.1, 308.52, 311.1, 311.1, 999, 13, 999, 890), List(12), List(NP, 1686180329, 1686230167), 1686210999, List(84.5667, 27.7667), Upardang Gadhi, List(List(801, Clouds, few clouds, 02d)), 200, 1282621, List(4.64, 208, 3.79), stations)",664bb0a8-7dca-40ab-a222-4ef33103648e,2023-06-08T08:01:57.493+0000,Kshitiz Dhakal
"List(10000, 20700, List(311.66, 309.44, 311.66, 311.66, 1001, 15, 1001, 931), List(35), List(NP, 1686180824, 1686230761), 1686210992, List(82.2973, 28.131), Tulsīpur, List(List(802, Clouds, scattered clouds, 03d)), 200, 1282635, List(6.25, 231, 5.56), stations)",664bb0a8-7dca-40ab-a222-4ef33103648e,2023-06-08T08:01:57.493+0000,Kshitiz Dhakal
"List(10000, 20700, List(316.94, 314.72, 316.94, 316.94, 998, 12, 998, 977), List(2), List(NP, 1686180363, 1686230165), 1686211322, List(84.5, 27.6333), Tikoli, List(List(800, Clear, clear sky, 01d)), 200, 1282665, List(4.71, 202, 3.89), stations)",664bb0a8-7dca-40ab-a222-4ef33103648e,2023-06-08T08:01:57.493+0000,Kshitiz Dhakal
"List(10000, 20700, List(316.76, 313.8, 316.76, 316.76, 999, 10, 999, 982), List(0), List(NP, 1686181053, 1686231091), 1686210995, List(81.1333, 28.5), Ṭikāpur, List(List(800, Clear, clear sky, 01d)), 200, 1282666, List(2.59, 219, 3.05), stations)",664bb0a8-7dca-40ab-a222-4ef33103648e,2023-06-08T08:01:57.493+0000,Kshitiz Dhakal


In [None]:
@keep_log
def get_process(df):
    df = df.select(
            col('result.dt'),
            from_unixtime(col('result.dt')).alias('date_time'),
            to_date(col('date_time')).alias('date'),
            date_format(col('date_time'), 'HH:mm:ss').alias('time'),
            col('result.id').alias('city_id'),
            col('result.name').alias('city_name'),
            col('result.timezone'),
            col('result.sys.country'),
            col('result.coord.lat'),
            col('result.coord.lon'),
            col('result.main.temp'),
            col('result.main.temp_min'),
            col('result.main.temp_max'),
            col('result.main.pressure'),
            col('result.main.humidity'),
            col('result.visibility'),
            col('result.wind.speed').alias("wind_speed"),
            col('result.wind.deg').alias("wind_deg"),
            col('result.wind.gust').alias("wind_gust"),
            col('result.clouds.all').alias("clouds_all"))
    
    start = datetime.fromtimestamp(df.selectExpr("min(dt)").first()[0])
    end = datetime.fromtimestamp(df.selectExpr("max(dt)").first()[0])
    
    return df, start, end

In [None]:
# df_cleansed_data = get_process('PROCESSED', 'cleansed_weather', df_raw)

In [None]:
# display(df_cleansed_data)

dt,date_time,date,time,city_id,city_name,timezone,country,lat,lon,temp,temp_min,temp_max,pressure,humidity,visibility,wind_speed,wind_deg,wind_gust,clouds_all,load_run_id,created_on,created_by
1686210991,2023-06-08 07:56:31,2023-06-08,07:56:31,1282616,Wāliṅ,20700,NP,27.9833,83.7667,312.44,312.44,312.44,1000,14,10000,3.24,155,2.75,12,ce54e4fc-1136-44ac-bc66-7a8bd5274680,2023-06-08T08:04:19.169+0000,Kshitiz Dhakal
1686210991,2023-06-08 07:56:31,2023-06-08,07:56:31,1282621,Upardang Gadhi,20700,NP,27.7667,84.5667,311.1,311.1,311.1,999,13,10000,4.64,208,3.79,12,ce54e4fc-1136-44ac-bc66-7a8bd5274680,2023-06-08T08:04:19.169+0000,Kshitiz Dhakal
1686210992,2023-06-08 07:56:32,2023-06-08,07:56:32,1282635,Tulsīpur,20700,NP,28.131,82.2973,311.66,311.66,311.66,1001,15,10000,6.25,231,5.56,35,ce54e4fc-1136-44ac-bc66-7a8bd5274680,2023-06-08T08:04:19.169+0000,Kshitiz Dhakal
1686211322,2023-06-08 08:02:02,2023-06-08,08:02:02,1282665,Tikoli,20700,NP,27.6333,84.5,316.94,316.94,316.94,998,12,10000,4.71,202,3.89,2,ce54e4fc-1136-44ac-bc66-7a8bd5274680,2023-06-08T08:04:19.169+0000,Kshitiz Dhakal
1686210992,2023-06-08 07:56:32,2023-06-08,07:56:32,1282666,Ṭikāpur,20700,NP,28.5,81.1333,316.76,316.76,316.76,999,10,10000,2.59,219,3.05,0,ce54e4fc-1136-44ac-bc66-7a8bd5274680,2023-06-08T08:04:19.169+0000,Kshitiz Dhakal
