In [0]:
import requests
from pyspark.sql.functions import col,udf
import time
import uuid
from datetime import datetime
from pyspark.sql.types import DateType
from pyspark.sql.types import TimestampType

####Load the Previous Delta City table and Retrieve 5 data of city name column

In [0]:
# Reload the Delta table as a new DataFrame
df = spark.read.format("delta").load("/FileStore/shared_uploads/ujjwal.baral3980@kistcollege.edu.np/delta_table")
df_city=df.select("name").limit(5)
display(df_city)

name
Upardang Gadhi
Thyāngboche
Thukla
Irung
Thānkoṭ


####Create a log table

In [0]:
%sql
 CREATE SCHEMA IF NOT EXISTS fact;
 --DROP TABLE IF EXISTS fact.log_table;
 CREATE OR REPLACE TABLE  fact.log_table (
  id STRING,
  load_type STRING,
  table_name STRING,
  process_start_time TIMESTAMP,
  process_end_time TIMESTAMP,
  status STRING,
  comments STRING,
  start_date_time TIMESTAMP,
  end_date_time TIMESTAMP,
  created_on TIMESTAMP,
  created_by STRING
)
USING DELTA;


#### Extract the weather data from the OpenWeatherMap API here, for the cities present in the cities table with json format

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  from_json
from pyspark.sql.types import StringType, StructType, StructField, DoubleType, IntegerType, LongType, ArrayType
import json

api_endpoint = "https://api.openweathermap.org/data/2.5/weather"

# Function to fetch city data and insert it into the log table
def fetch_city_data(city):
    url = f"{api_endpoint}?q={city}&appid=238efd291d8e240c4a6673962e976afd"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        return json.dumps(data)
    else:
        return None

# Apply the fetch_city_data function to fetch weather data for each city
fetch_city_data_udf = udf(fetch_city_data, StringType())
weather_data = df_city.withColumn("weather", fetch_city_data_udf(col("name").cast("string")))

# Define the schema for the JSON data
json_schema = StructType([
    StructField("coord", StructType([
        StructField("lon", StringType()),
        StructField("lat", StringType())
    ])),
    StructField("weather", ArrayType(StructType([
        StructField("id", IntegerType()),
        StructField("main", StringType()),
        StructField("description", StringType()),
        StructField("icon", StringType())
    ]))),
    StructField("base", StringType()),
    StructField("main", StructType([
        StructField("temp", DoubleType()),
        StructField("feels_like", DoubleType()),
        StructField("temp_min", DoubleType()),
        StructField("temp_max", DoubleType()),
        StructField("pressure", IntegerType()),
        StructField("humidity", IntegerType()),
        StructField("sea_level", IntegerType()),
        StructField("grnd_level", IntegerType())
    ])),
    StructField("visibility", IntegerType()),
    StructField("wind", StructType([
        StructField("speed", DoubleType()),
        StructField("deg", IntegerType()),
        StructField("gust", DoubleType())
    ])),
    StructField("clouds", StructType([
        StructField("all", IntegerType())
    ])),
    StructField("dt", LongType()),
    StructField("sys", StructType([
        StructField("country", StringType()),
        StructField("sunrise", LongType()),
        StructField("sunset", LongType())
    ])),
    StructField("timezone", IntegerType()),
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("cod", IntegerType())
])

# Apply the schema to the parsed JSON column
raw_parsed_df = weather_data.withColumn("weather_parsed", from_json(col("weather"), json_schema))

# Print the schema
raw_parsed_df.select("weather_parsed").printSchema()
#drop the string type column
raw_parsed_df=raw_parsed_df.drop("weather")

raw_parsed_df.display()


root
 |-- weather_parsed: struct (nullable = true)
 |    |-- coord: struct (nullable = true)
 |    |    |-- lon: string (nullable = true)
 |    |    |-- lat: string (nullable = true)
 |    |-- weather: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: integer (nullable = true)
 |    |    |    |-- main: string (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- icon: string (nullable = true)
 |    |-- base: string (nullable = true)
 |    |-- main: struct (nullable = true)
 |    |    |-- temp: double (nullable = true)
 |    |    |-- feels_like: double (nullable = true)
 |    |    |-- temp_min: double (nullable = true)
 |    |    |-- temp_max: double (nullable = true)
 |    |    |-- pressure: integer (nullable = true)
 |    |    |-- humidity: integer (nullable = true)
 |    |    |-- sea_level: integer (nullable = true)
 |    |    |-- grnd_level: integer (nullable = true)
 |    |-- visibility: in

name,weather_parsed
Upardang Gadhi,"List(List(84.5667, 27.7667), List(List(800, Clear, clear sky, 01d)), stations, List(309.02, 306.37, 309.02, 309.02, 1001, 12, 1001, 891), 10000, List(3.46, 203, 3.03), List(1), 1685874238, List(NP, 1685834753, 1685884460), 20700, 1282621, Upardang Gadhi, 200)"
Thyāngboche,"List(List(86.7667, 27.8333), List(List(803, Clouds, broken clouds, 04d)), stations, List(280.98, 278.9, 280.98, 280.98, 1020, 74, 1020, 652), 6918, List(3.25, 210, 2.79), List(63), 1685874239, List(NP, 1685834217, 1685883940), 20700, 1282670, Thyāngboche, 200)"
Thukla,"List(List(86.8, 27.9167), List(List(802, Clouds, scattered clouds, 03d)), stations, List(276.82, 274.04, 276.82, 276.82, 1020, 75, 1020, 604), 10000, List(3.03, 213, 2.86), List(39), 1685874239, List(NP, 1685834198, 1685883943), 20700, 1282673, Thukla, 200)"
Irung,"List(List(85.5, 28.0167), List(List(801, Clouds, few clouds, 02d)), stations, List(287.14, 285.33, 287.14, 288.93, 1011, 28, 1011, 679), 10000, List(3.04, 222, 2.31), List(14), 1685874239, List(NP, 1685834496, 1685884269), 20700, 1282679, Irung, 200)"
Thānkoṭ,"List(List(85.1833, 27.6833), List(List(802, Clouds, scattered clouds, 03d)), stations, List(296.95, 296.46, 295.16, 296.95, 1012, 41, null, null), 10000, List(6.17, 200, null), List(40), 1685874240, List(NP, 1685834616, 1685884301), 20700, 1282684, Thānkoṭ, 200)"


####testing raw weather data

In [0]:
from pyspark.sql.functions import lit
import uuid
from datetime import datetime

def load_raw_table(df, id,schema,tablename, creator):

    # Add columns for unique ID and current date
    df = df.withColumn('load_run_id', lit(id))
    df = df.withColumn('created_on', lit(datetime.now()))
    df = df.withColumn('created_by', lit(creator))
    

    # Save the DataFrame as a table
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema};")
    df.write.format('delta').mode('append').saveAsTable(f"{schema}.{tablename}")




In [0]:
id = str(uuid.uuid4())
load_raw_table(raw_parsed_df, id, 'raw','weather_data',"Uzwal Baral")

In [0]:
%sql
select * from raw.weather_data

name,weather_parsed,load_run_id,created_on,created_by
Upardang Gadhi,"List(List(84.5667, 27.7667), List(List(800, Clear, clear sky, 01d)), stations, List(300.66, 299.62, 300.66, 300.66, 1002, 22, 1002, 889), 10000, List(1.21, 177, 1.3), List(0), 1685884045, List(NP, 1685834753, 1685884460), 20700, 1282621, Upardang Gadhi, 200)",279145db-6201-4cdf-b1fc-4797123a7448,2023-06-04T13:07:22.690+0000,Uzwal Baral
Thyāngboche,"List(List(86.7667, 27.8333), List(List(803, Clouds, broken clouds, 04n)), stations, List(278.31, 278.31, 278.31, 278.31, 1021, 80, 1021, 650), 10000, List(0.64, 241, 1.25), List(75), 1685884045, List(NP, 1685834217, 1685883940), 20700, 1282670, Thyāngboche, 200)",279145db-6201-4cdf-b1fc-4797123a7448,2023-06-04T13:07:22.690+0000,Uzwal Baral
Thukla,"List(List(86.8, 27.9167), List(List(803, Clouds, broken clouds, 04n)), stations, List(272.93, 272.93, 272.93, 272.93, 1024, 82, 1024, 602), 10000, List(0.86, 251, 1.59), List(72), 1685884045, List(NP, 1685834198, 1685883943), 20700, 1282673, Thukla, 200)",279145db-6201-4cdf-b1fc-4797123a7448,2023-06-04T13:07:22.690+0000,Uzwal Baral
Irung,"List(List(85.5, 28.0167), List(List(802, Clouds, scattered clouds, 03d)), stations, List(286.14, 284.72, 286.14, 287.26, 1013, 47, 1013, 675), 10000, List(0.22, 205, 0.53), List(30), 1685884046, List(NP, 1685834496, 1685884269), 20700, 1282679, Irung, 200)",279145db-6201-4cdf-b1fc-4797123a7448,2023-06-04T13:07:22.690+0000,Uzwal Baral
Thānkoṭ,"List(List(85.1833, 27.6833), List(List(801, Clouds, few clouds, 02d)), stations, List(295.28, 294.65, 294.16, 295.28, 1011, 42, null, null), 10000, List(5.14, 210, null), List(20), 1685884046, List(NP, 1685834616, 1685884301), 20700, 1282684, Thānkoṭ, 200)",279145db-6201-4cdf-b1fc-4797123a7448,2023-06-04T13:07:22.690+0000,Uzwal Baral


In [0]:
%sql
delete from raw.weather_data

num_affected_rows
5


####Testing Flatten Clean Table

In [0]:
from pyspark.sql.functions import from_unixtime, current_timestamp

def flatten_and_save_clean_table(schema, table):

    # Read the table into a DataFrame
    df = spark.read.table("raw.weather_data")

    # Flatten and select specific columns with aliases
    weather_flattened_df = df.select(
        col("name").alias("city_name"),
        col("weather_parsed.dt").alias("dt"),
        from_unixtime(col("weather_parsed.dt")).alias("date"),
        col("weather_parsed.id").alias("city_id"),
        col("weather_parsed.timezone").alias("timezone"),
        col("weather_parsed.sys.country").alias("country"),
        col("weather_parsed.coord.lat").alias("lat"),
        col("weather_parsed.coord.lon").alias("lon"),
        col("weather_parsed.main.temp").alias("temp"),
        col("weather_parsed.main.temp_min").alias("temp_min"),
        col("weather_parsed.main.temp_max").alias("temp_max"),
        col("weather_parsed.main.pressure").alias("pressure"),
        col("weather_parsed.main.humidity").alias("humidity"),
        col("weather_parsed.visibility").alias("numeric"),
        col("weather_parsed.wind.speed").alias("wind_speed"),
        col("weather_parsed.wind.deg").alias("wind_deg"),
        col("weather_parsed.wind.gust").alias("wind_gust"),
        col("weather_parsed.clouds.all").alias("clouds_all"),
        current_timestamp().alias("created_on"),
        col("load_run_id")
    )

    # Save the DataFrame as a table
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema};")
    weather_flattened_df.write.format("delta").mode("append").saveAsTable(f"{schema}.{table}")





In [0]:
# Call the function to execute the code and save the table
flatten_and_save_clean_table("fact", "clean_data")

In [0]:
%sql 
select * from fact.clean_data

city_name,dt,date,city_id,timezone,country,lat,lon,temp,temp_min,temp_max,pressure,humidity,numeric,wind_speed,wind_deg,wind_gust,clouds_all,created_on,load_run_id
Upardang Gadhi,1685878820,2023-06-04 11:40:20,1282621,20700,NP,27.7667,84.5667,306.21,306.21,306.21,1001,15,10000,2.29,195,3.06,0,2023-06-04T11:46:25.168+0000,ff346e29-fdea-4e49-ae71-cc2a00fc1fe6
Thyāngboche,1685878820,2023-06-04 11:40:20,1282670,20700,NP,27.8333,86.7667,279.17,279.17,279.17,1021,78,10000,2.04,223,1.96,66,2023-06-04T11:46:25.168+0000,ff346e29-fdea-4e49-ae71-cc2a00fc1fe6
Thukla,1685878820,2023-06-04 11:40:20,1282673,20700,NP,27.9167,86.8,274.41,274.41,274.41,1022,82,10000,2.03,216,2.07,44,2023-06-04T11:46:25.168+0000,ff346e29-fdea-4e49-ae71-cc2a00fc1fe6
Irung,1685878821,2023-06-04 11:40:21,1282679,20700,NP,28.0167,85.5,287.14,287.14,288.37,1011,34,10000,2.23,211,2.29,27,2023-06-04T11:46:25.168+0000,ff346e29-fdea-4e49-ae71-cc2a00fc1fe6
Thānkoṭ,1685878821,2023-06-04 11:40:21,1282684,20700,NP,27.6833,85.1833,296.39,295.16,296.39,1011,40,10000,5.14,220,,40,2023-06-04T11:46:25.168+0000,ff346e29-fdea-4e49-ae71-cc2a00fc1fe6


In [0]:
%sql 
delete from fact.clean_data

num_affected_rows
5


####Tesing load Data

In [0]:
def load_log_table(schema : str, table : str, **kwargs):
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema};")
    json_rdd = spark.sparkContext.parallelize([kwargs])
    df = spark.read.json(json_rdd)
    
        
    timestamp_cols = ["process_start_time", "start_date_time"]
    for col_name in timestamp_cols:
        df = df.withColumn(col_name, col(col_name).cast(TimestampType()))    
        
    # Add columns with current datetime values as strings
    current_datetime = datetime.now()
    df = df.withColumn("process_end_time", lit(current_datetime))
    df = df.withColumn("end_date_time", lit(current_datetime))
    df = df.withColumn("created_on", lit(current_datetime))

    
 
    df.write.format('delta').mode('append').option("mergeSchema", "true").saveAsTable(f"{schema}.{table}")

In [0]:


load_log_table('FACT', 'log_table', 
                id = str(uuid.uuid4()),
                load_type = "Incremental",
                table_name = "sales_data",
                process_start_time = str(datetime.now()),
                status = "Completed",
                comments = "Data processed successfully",
                end_date_time = str(datetime.now()),
                created_by = "Uzwal"
              )

In [0]:
%sql
select * from FACT.log_table;

id,load_type,table_name,process_start_time,process_end_time,status,comments,start_date_time,end_date_time,created_on,created_by


In [0]:
%sql
delete from FACT.log_table;

num_affected_rows
1


##Main PipeLine

In [0]:
import time

for _ in range(10):
    
    # Loading raw.raw_weather_data table
    args = {
        'start_date_time': str(datetime.now()),
        'id': str(uuid.uuid4()),
        'table_name': 'weather_data',
        'load_type': 'raw_load',
        'status': 'COMPLETED',
        'created_by': 'Ujjwal Baral',
        'comments': '',
        'process_start_time': str(datetime.now())
    }

    try:
        load_raw_table(raw_parsed_df, id=args['id'], schema='raw', tablename=args['table_name'], creator=args['created_by'])
    except Exception as e:
        args['status'] = 'ERROR'
        args['comments'] = str(e)

    load_log_table('FACT', 'log_table', **args)
    

    
    cleanArgs = {
        'start_date_time': str(datetime.now()),
        'id': str(uuid.uuid4()),
        'table_name': 'Weather_clean_data',
        'load_type': 'clean_load',
        'status': 'COMPLETED',
        'created_by': 'Ujjwal Baral',
        'comments': '',
        'process_start_time': str(datetime.now())
    }
  
    try:
        flatten_and_save_clean_table(schema='fact', table=args['table_name'])
    except Exception as e:
        args['status'] = 'ERROR'
        args['comments'] = str(e)
        
    load_log_table('FACT', 'log_table', **cleanArgs)
    



In [0]:
%sql
select * from fact.log_table

id,load_type,table_name,process_start_time,process_end_time,status,comments,start_date_time,end_date_time,created_on,created_by
28a89375-4c32-48dc-bd33-8908524732f7,clean_load,Weather_clean_data,2023-06-04T16:26:56.479+0000,2023-06-04T16:27:11.166+0000,COMPLETED,,2023-06-04T16:26:56.479+0000,2023-06-04T16:27:11.166+0000,2023-06-04T16:27:11.166+0000,Ujjwal Baral
1914f4ae-5871-4aa1-8801-ba9cc6216014,clean_load,Weather_clean_data,2023-06-04T16:24:38.766+0000,2023-06-04T16:24:52.929+0000,COMPLETED,,2023-06-04T16:24:38.766+0000,2023-06-04T16:24:52.929+0000,2023-06-04T16:24:52.929+0000,Ujjwal Baral
cefd8467-78ea-4f3d-82c3-068adca96a73,clean_load,Weather_clean_data,2023-06-04T16:25:07.622+0000,2023-06-04T16:25:21.073+0000,COMPLETED,,2023-06-04T16:25:07.622+0000,2023-06-04T16:25:21.073+0000,2023-06-04T16:25:21.073+0000,Ujjwal Baral
be36c8d4-225d-4fe5-8716-64e6b0a6239b,clean_load,Weather_clean_data,2023-06-04T16:26:29.505+0000,2023-06-04T16:26:43.844+0000,COMPLETED,,2023-06-04T16:26:29.505+0000,2023-06-04T16:26:43.844+0000,2023-06-04T16:26:43.844+0000,Ujjwal Baral
4242be40-ad92-4971-8b40-c6ba4c74d58f,clean_load,Weather_clean_data,2023-06-04T16:25:35.610+0000,2023-06-04T16:25:49.413+0000,COMPLETED,,2023-06-04T16:25:35.610+0000,2023-06-04T16:25:49.413+0000,2023-06-04T16:25:49.413+0000,Ujjwal Baral
b3378156-bcc8-4493-815c-6a92b8eade26,clean_load,Weather_clean_data,2023-06-04T16:26:01.865+0000,2023-06-04T16:26:16.720+0000,COMPLETED,,2023-06-04T16:26:01.865+0000,2023-06-04T16:26:16.720+0000,2023-06-04T16:26:16.720+0000,Ujjwal Baral
60f46e77-d6f5-4c9c-a59e-8bcf16222147,clean_load,Weather_clean_data,2023-06-04T16:24:12.315+0000,2023-06-04T16:24:26.293+0000,COMPLETED,,2023-06-04T16:24:12.315+0000,2023-06-04T16:24:26.293+0000,2023-06-04T16:24:26.293+0000,Ujjwal Baral
023757d0-7c29-4627-84d4-b8619de49250,clean_load,Weather_clean_data,2023-06-04T16:27:26.620+0000,2023-06-04T16:27:40.105+0000,COMPLETED,,2023-06-04T16:27:26.620+0000,2023-06-04T16:27:40.105+0000,2023-06-04T16:27:40.105+0000,Ujjwal Baral
54b7eda3-6c0a-463a-b5ef-8058afd6d786,clean_load,Weather_clean_data,2023-06-04T16:27:55.770+0000,2023-06-04T16:28:10.978+0000,COMPLETED,,2023-06-04T16:27:55.770+0000,2023-06-04T16:28:10.978+0000,2023-06-04T16:28:10.978+0000,Ujjwal Baral
90d64147-4e81-47ed-af7b-573247846f6c,clean_load,Weather_clean_data,2023-06-04T16:28:23.376+0000,2023-06-04T16:28:37.746+0000,COMPLETED,,2023-06-04T16:28:23.376+0000,2023-06-04T16:28:37.746+0000,2023-06-04T16:28:37.746+0000,Ujjwal Baral


####Clean Data Table

In [0]:
%sql
select * from fact.Weather_clean_dataweather_data

city_name,dt,date,city_id,timezone,country,lat,lon,temp,temp_min,temp_max,pressure,humidity,numeric,wind_speed,wind_deg,wind_gust,clouds_all,created_on,load_run_id
Upardang Gadhi,1685894298,2023-06-04 15:58:18,1282621,20700,NP,27.7667,84.5667,297.05,297.05,297.05,1004,26,10000,1.99,54,1.78,0,2023-06-04T16:11:25.466+0000,b94bc649-7bfd-443e-9654-64813d447c5d
Thyāngboche,1685894298,2023-06-04 15:58:18,1282670,20700,NP,27.8333,86.7667,276.35,276.35,276.35,1024,71,10000,1.48,33,1.24,46,2023-06-04T16:11:25.466+0000,b94bc649-7bfd-443e-9654-64813d447c5d
Thukla,1685894299,2023-06-04 15:58:19,1282673,20700,NP,27.9167,86.8,270.8,270.8,270.8,1027,68,10000,1.41,27,1.62,43,2023-06-04T16:11:25.466+0000,b94bc649-7bfd-443e-9654-64813d447c5d
Irung,1685894299,2023-06-04 15:58:19,1282679,20700,NP,28.0167,85.5,284.14,284.14,284.14,1016,44,10000,3.7,28,2.96,8,2023-06-04T16:11:25.466+0000,b94bc649-7bfd-443e-9654-64813d447c5d
Thānkoṭ,1685894029,2023-06-04 15:53:49,1282684,20700,NP,27.6833,85.1833,292.16,292.16,292.16,1013,47,10000,2.06,200,,20,2023-06-04T16:11:25.466+0000,b94bc649-7bfd-443e-9654-64813d447c5d
Upardang Gadhi,1685892756,2023-06-04 15:32:36,1282621,20700,NP,27.7667,84.5667,297.05,297.05,297.05,1004,26,10000,1.99,54,1.78,0,2023-06-04T16:11:25.466+0000,e3fda6cb-0ce9-4e44-9076-88c5c9e86820
Thyāngboche,1685892757,2023-06-04 15:32:37,1282670,20700,NP,27.8333,86.7667,276.35,276.35,276.35,1024,71,10000,1.48,33,1.24,46,2023-06-04T16:11:25.466+0000,e3fda6cb-0ce9-4e44-9076-88c5c9e86820
Thukla,1685892757,2023-06-04 15:32:37,1282673,20700,NP,27.9167,86.8,270.8,270.8,270.8,1027,68,10000,1.41,27,1.62,43,2023-06-04T16:11:25.466+0000,e3fda6cb-0ce9-4e44-9076-88c5c9e86820
Irung,1685892757,2023-06-04 15:32:37,1282679,20700,NP,28.0167,85.5,284.14,284.14,284.14,1016,44,10000,3.7,28,2.96,8,2023-06-04T16:11:25.466+0000,e3fda6cb-0ce9-4e44-9076-88c5c9e86820
Thānkoṭ,1685892742,2023-06-04 15:32:22,1282684,20700,NP,27.6833,85.1833,292.16,292.16,292.16,1013,47,10000,2.06,200,,20,2023-06-04T16:11:25.466+0000,e3fda6cb-0ce9-4e44-9076-88c5c9e86820
