#### a) Generate a Data Model and show relationship between tables. Describe each attribute.
![Weather Data Model](./images/ER.webp)

In [16]:
!pip install pyspark



importig necessary libraries

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, year, hour, avg
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window

In [18]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
	  .master("local[*]")\
	  .appName("PROJECT_WEATHER_DATASETS")\
	  .config('spark.ui.port', '4050')\
	  .config("spark.sql.autoBroadcastJoinThreshold", "-1")\
	  .getOrCreate()

- b) Generate a fill rate report for all columns (Meaning, how many records in each column has proper values.
- c) Data Cleaning (For ex: Null Values, Empty Values, string value in integer columns etc. should be removed)
- d) Remove logically incorrect Data 
- e) Show data cleanliness percentage (i.e. number_of_records_post_cleaning/source_count *100)

#### for cities dataset

In [19]:
cities=spark.read.option("Header",True).option("delimiter",",").csv('Datasets/cities.csv')
cities.show()

+------------+----------+----------+
|   city_name|  latitude| longitude|
+------------+----------+----------+
|   Abu Dhabi| 24.466669| 54.366669|
|   Amsterdam| 52.374031|   4.88969|
|      Athens|  37.97945| 23.716221|
|     Atlanta| 33.749001|-84.387978|
|    Auckland| -36.85582|174.743042|
|      Austin| 34.998421|-91.983757|
|     Bangkok|  13.87719| 100.71991|
|   Barcelona|  41.38879|   2.15899|
|     Beijing| 39.907501|116.397232|
|      Berlin| 54.033329|     10.45|
|      Bilbao| 43.262711|  -2.92528|
|  Birmingham| 52.481419|  -1.89983|
|      Bogota|  4.624335|-74.063644|
|      Boston| 30.791861|-83.789886|
|    Brisbane|-27.467939|153.028091|
|    Brussels| 50.850449|   4.34878|
|   Bucharest| 44.432251|  26.10626|
|    Budapest| 47.497913| 19.040236|
|Buenos Aires|-34.603722|-58.381592|
|       Busan| 35.133331|129.050003|
+------------+----------+----------+
only showing top 20 rows



In [20]:


# Get total number of records
total_records = cities.count()

# Calculate the number of valid (non-null and non-garbage) values for each column
fill_rate_report = (
    cities.select(
        *[
            (count(when(col(column).isNotNull() & (col(column) != ''), column)) / total_records * 100)
            .alias(f"{column}_fill_rate")
            for column in cities.columns
        ]
    )
)

# Show the fill rate report
fill_rate_report.show()


+-------------------+------------------+-------------------+
|city_name_fill_rate|latitude_fill_rate|longitude_fill_rate|
+-------------------+------------------+-------------------+
|              100.0|             100.0|              100.0|
+-------------------+------------------+-------------------+



In [21]:

# Step 1: Replace empty strings with null values for all columns
cities_cleaned = cities.select(
    *[when(col(column) != "", col(column)).otherwise(None).alias(column) for column in cities.columns]
)

# Step 2: Drop rows with null values in important columns (e.g., city_name, latitude, longitude)
cities_cleaned = cities_cleaned.dropna(subset=["city_name", "latitude", "longitude"])

# Step 3: Convert numeric columns (latitude, longitude) to the correct data type (DoubleType)
numeric_columns = ["latitude", "longitude"]
for column in numeric_columns:
    cities_cleaned = cities_cleaned.withColumn(column, col(column).cast(DoubleType()))

# Step 4: Remove rows where numeric columns have invalid values (e.g., not a number)
for column in numeric_columns:
    cities_cleaned = cities_cleaned.filter(col(column).isNotNull())

# Step 5: Show the cleaned dataset
cities_cleaned.show()


+------------+----------+----------+
|   city_name|  latitude| longitude|
+------------+----------+----------+
|   Abu Dhabi| 24.466669| 54.366669|
|   Amsterdam| 52.374031|   4.88969|
|      Athens|  37.97945| 23.716221|
|     Atlanta| 33.749001|-84.387978|
|    Auckland| -36.85582|174.743042|
|      Austin| 34.998421|-91.983757|
|     Bangkok|  13.87719| 100.71991|
|   Barcelona|  41.38879|   2.15899|
|     Beijing| 39.907501|116.397232|
|      Berlin| 54.033329|     10.45|
|      Bilbao| 43.262711|  -2.92528|
|  Birmingham| 52.481419|  -1.89983|
|      Bogota|  4.624335|-74.063644|
|      Boston| 30.791861|-83.789886|
|    Brisbane|-27.467939|153.028091|
|    Brussels| 50.850449|   4.34878|
|   Bucharest| 44.432251|  26.10626|
|    Budapest| 47.497913| 19.040236|
|Buenos Aires|-34.603722|-58.381592|
|       Busan| 35.133331|129.050003|
+------------+----------+----------+
only showing top 20 rows



In [22]:

# Step 2: Remove rows with invalid latitude and longitude values
cities_cleaned = cities_cleaned.filter(
    (col("latitude") >= -90) & (col("latitude") <= 90) &
    (col("longitude") >= -180) & (col("longitude") <= 180)
)

# Show cleaned dataset
cities_cleaned.show()


+------------+----------+----------+
|   city_name|  latitude| longitude|
+------------+----------+----------+
|   Abu Dhabi| 24.466669| 54.366669|
|   Amsterdam| 52.374031|   4.88969|
|      Athens|  37.97945| 23.716221|
|     Atlanta| 33.749001|-84.387978|
|    Auckland| -36.85582|174.743042|
|      Austin| 34.998421|-91.983757|
|     Bangkok|  13.87719| 100.71991|
|   Barcelona|  41.38879|   2.15899|
|     Beijing| 39.907501|116.397232|
|      Berlin| 54.033329|     10.45|
|      Bilbao| 43.262711|  -2.92528|
|  Birmingham| 52.481419|  -1.89983|
|      Bogota|  4.624335|-74.063644|
|      Boston| 30.791861|-83.789886|
|    Brisbane|-27.467939|153.028091|
|    Brussels| 50.850449|   4.34878|
|   Bucharest| 44.432251|  26.10626|
|    Budapest| 47.497913| 19.040236|
|Buenos Aires|-34.603722|-58.381592|
|       Busan| 35.133331|129.050003|
+------------+----------+----------+
only showing top 20 rows



In [23]:
# Get the original count of records before cleaning
original_count = cities.count()


cleaned_count = cities_cleaned.count()

# Calculate cleanliness percentage
cleanliness_percentage = (cleaned_count / original_count) * 100

# Show the cleanliness percentage
print(f"Data Cleanliness Percentage: {cleanliness_percentage:.2f}%")


Data Cleanliness Percentage: 100.00%


#### for daily_data_combined_1960_to_1969 dataset

In [24]:
daily_data_1960_1969=spark.read.option("Header",True).option("delimiter",",").csv('DataSets/daily_data_combined_1960_to_1969.csv')
daily_data_1960_1969.show()

+---------+----------+------------+------------------+------------------+-------------------+------------------------+------------------------+-------------------------+----------------+----------------+-----------------+-----------------+-----------------+--------+------------+-------------------+------------------+------------------+---------------------------+-----------------------+--------------------------+
|city_name|  datetime|weather_code|temperature_2m_max|temperature_2m_min|temperature_2m_mean|apparent_temperature_max|apparent_temperature_min|apparent_temperature_mean|         sunrise|          sunset|daylight_duration|sunshine_duration|precipitation_sum|rain_sum|snowfall_sum|precipitation_hours|wind_speed_10m_max|wind_gusts_10m_max|wind_direction_10m_dominant|shortwave_radiation_sum|et0_fao_evapotranspiration|
+---------+----------+------------+------------------+------------------+-------------------+------------------------+------------------------+-----------------------

In [25]:


# Get total number of records
total_records = daily_data_1960_1969.count()

# Calculate the number of valid (non-null and non-garbage) values for each column
fill_rate_report = (
   daily_data_1960_1969.select(
        *[
            (count(when(col(column).isNotNull() & (col(column) != ''), column)) / total_records * 100)
            .alias(f"{column}_fill_rate")
            for column in daily_data_1960_1969.columns
        ]
    )
)

# Show the fill rate report
fill_rate_report.show()


+-------------------+------------------+----------------------+----------------------------+----------------------------+-----------------------------+----------------------------------+----------------------------------+-----------------------------------+-----------------+----------------+---------------------------+---------------------------+---------------------------+------------------+----------------------+-----------------------------+----------------------------+----------------------------+-------------------------------------+---------------------------------+------------------------------------+
|city_name_fill_rate|datetime_fill_rate|weather_code_fill_rate|temperature_2m_max_fill_rate|temperature_2m_min_fill_rate|temperature_2m_mean_fill_rate|apparent_temperature_max_fill_rate|apparent_temperature_min_fill_rate|apparent_temperature_mean_fill_rate|sunrise_fill_rate|sunset_fill_rate|daylight_duration_fill_rate|sunshine_duration_fill_rate|precipitation_sum_fill_rate|rain_sum

In [26]:
daily_data_1960_1969_cleaned = daily_data_1960_1969.dropna()

# Filter out rows where numeric columns have non-numeric values
daily_data_1960_1969_cleaned = daily_data_1960_1969_cleaned.filter(
    col('temperature_2m_max').cast('double').isNotNull() &
    col('temperature_2m_min').cast('double').isNotNull() &
    col('temperature_2m_mean').cast('double').isNotNull()
)

# Replace empty strings with null for specific columns
daily_data_1960_1969_cleaned = daily_data_1960_1969_cleaned.withColumn(
    "temperature_2m_max",
    when(col("temperature_2m_max") == "", None).otherwise(col("temperature_2m_max"))
)

# Remove duplicates
daily_data_1960_1969_cleaned = daily_data_1960_1969_cleaned.dropDuplicates()

# Show the cleaned data
daily_data_1960_1969_cleaned.show()

+---------+----------+------------+------------------+------------------+-------------------+------------------------+------------------------+-------------------------+----------------+----------------+-----------------+-----------------+-----------------+--------+------------+-------------------+------------------+------------------+---------------------------+-----------------------+--------------------------+
|city_name|  datetime|weather_code|temperature_2m_max|temperature_2m_min|temperature_2m_mean|apparent_temperature_max|apparent_temperature_min|apparent_temperature_mean|         sunrise|          sunset|daylight_duration|sunshine_duration|precipitation_sum|rain_sum|snowfall_sum|precipitation_hours|wind_speed_10m_max|wind_gusts_10m_max|wind_direction_10m_dominant|shortwave_radiation_sum|et0_fao_evapotranspiration|
+---------+----------+------------+------------------+------------------+-------------------+------------------------+------------------------+-----------------------

In [27]:
null_count = daily_data_1960_1969_cleaned.filter(
    (col("temperature_2m_max").isNull()) |
    (col("temperature_2m_min").isNull()) |
    (col("temperature_2m_mean").isNull()) |
    (col("apparent_temperature_max").isNull()) |
    (col("apparent_temperature_min").isNull()) |
    (col("apparent_temperature_mean").isNull()) |
    (col("sunrise").isNull()) |
    (col("sunset").isNull()) |
    (col("daylight_duration").isNull()) |
    (col("sunshine_duration").isNull()) |
    (col("precipitation_sum").isNull()) |
    (col("rain_sum").isNull()) |
    (col("snowfall_sum").isNull()) |
    (col("precipitation_hours").isNull()) |
    (col("wind_speed_10m_max").isNull()) |
    (col("wind_gusts_10m_max").isNull()) |
    (col("wind_direction_10m_dominant").isNull()) |
    (col("shortwave_radiation_sum").isNull()) |
    (col("et0_fao_evapotranspiration").isNull())
).count()

# 2. Remove rows with null values
cleaned_data = daily_data_1960_1969.na.drop()

In [28]:
source_count = daily_data_1960_1969.count()
cleaned_count = cleaned_data.count()
cleanliness_percentage = (cleaned_count / source_count) * 100

# Display the cleanliness percentage
print(f"Data Cleanliness Percentage: {cleanliness_percentage}%")

Data Cleanliness Percentage: 99.99945235487404%


#### for daily_units_2010_to_2019 dataset

In [29]:
daily_units=spark.read.option("Header",True).option("delimiter",",").csv('Datasets/daily_units_2010_to_2019.csv')
daily_units.show()

+-------+------------+------------------+------------------+-------------------+------------------------+------------------------+-------------------------+-------+-------+-----------------+-----------------+-----------------+--------+------------+-------------------+------------------+------------------+---------------------------+-----------------------+--------------------------+
|   time|weather_code|temperature_2m_max|temperature_2m_min|temperature_2m_mean|apparent_temperature_max|apparent_temperature_min|apparent_temperature_mean|sunrise| sunset|daylight_duration|sunshine_duration|precipitation_sum|rain_sum|snowfall_sum|precipitation_hours|wind_speed_10m_max|wind_gusts_10m_max|wind_direction_10m_dominant|shortwave_radiation_sum|et0_fao_evapotranspiration|
+-------+------------+------------------+------------------+-------------------+------------------------+------------------------+-------------------------+-------+-------+-----------------+-----------------+-----------------+--

In [30]:
# Get total number of records
total_records = daily_units.count()

# Calculate the number of valid (non-null and non-garbage) values for each column
fill_rate_report = (
    daily_units.select(
        *[
            (count(when(col(column).isNotNull() & (col(column) != ''), column)) / total_records * 100)
            .alias(f"{column}_fill_rate")
            for column in daily_units.columns
        ]
    )
)

# Show the fill rate report
fill_rate_report.show()


+--------------+----------------------+----------------------------+----------------------------+-----------------------------+----------------------------------+----------------------------------+-----------------------------------+-----------------+----------------+---------------------------+---------------------------+---------------------------+------------------+----------------------+-----------------------------+----------------------------+----------------------------+-------------------------------------+---------------------------------+------------------------------------+
|time_fill_rate|weather_code_fill_rate|temperature_2m_max_fill_rate|temperature_2m_min_fill_rate|temperature_2m_mean_fill_rate|apparent_temperature_max_fill_rate|apparent_temperature_min_fill_rate|apparent_temperature_mean_fill_rate|sunrise_fill_rate|sunset_fill_rate|daylight_duration_fill_rate|sunshine_duration_fill_rate|precipitation_sum_fill_rate|rain_sum_fill_rate|snowfall_sum_fill_rate|precipitation_

#### for hour_units_2010_to_2019 dataset

In [31]:
hourly_units=spark.read.option("Header",True).option("delimiter",",").csv('Datasets/hour_units_2010_to_2019.csv')
hourly_units.show()

+-------+--------------+--------------------+------------+--------------------+-------------+----+--------+----------+------------+------------+----------------+-----------+---------------+---------------+----------------+--------------------------+-----------------------+--------------+---------------+------------------+-------------------+--------------+-------------------------+--------------------------+----------------------------+-----------------------------+----------------------+-----------------------+-------------------------+--------------------------+-------------------+----------------+-----------------+------------------------+------------------------+---------------------+---------------------------+------------------------+-------------------------+--------------------------------+--------------------------------+-----------------------------+
|   time|temperature_2m|relative_humidity_2m|dew_point_2m|apparent_temperature|precipitation|rain|snowfall|snow_depth|weather_c

In [32]:


# Get total number of records
total_records = hourly_units.count()

# Calculate the number of valid (non-null and non-garbage) values for each column
fill_rate_report = (
    hourly_units.select(
        *[
            (count(when(col(column).isNotNull() & (col(column) != ''), column)) / total_records * 100)
            .alias(f"{column}_fill_rate")
            for column in hourly_units.columns
        ]
    )
)

# Show the fill rate report
fill_rate_report.show()


+--------------+------------------------+------------------------------+----------------------+------------------------------+-----------------------+--------------+------------------+--------------------+----------------------+----------------------+--------------------------+---------------------+-------------------------+-------------------------+--------------------------+------------------------------------+---------------------------------+------------------------+-------------------------+----------------------------+-----------------------------+------------------------+-----------------------------------+------------------------------------+--------------------------------------+---------------------------------------+--------------------------------+---------------------------------+-----------------------------------+------------------------------------+-----------------------------+--------------------------+---------------------------+----------------------------------+---

#### for hourly_data_combined_2010_to_2019 dataset

In [33]:
hourly_data_2010_2019=spark.read.option("Header",True).option("delimiter",",").csv('Datasets/hourly_data_combined_2010_to_2019.csv')
hourly_data_2010_2019.show()

+---------+-------------------+--------------+--------------------+------------+--------------------+-------------+----+--------+----------+------------+------------+----------------+-----------+---------------+---------------+----------------+--------------------------+-----------------------+--------------+---------------+------------------+-------------------+--------------+-------------------------+--------------------------+----------------------------+-----------------------------+----------------------+-----------------------+-------------------------+--------------------------+-------------------+----------------+-----------------+------------------------+------------------------+---------------------+---------------------------+------------------------+-------------------------+--------------------------------+--------------------------------+-----------------------------+
|city_name|           datetime|temperature_2m|relative_humidity_2m|dew_point_2m|apparent_temperature|prec

In [34]:
# Get total number of records
total_records = hourly_data_2010_2019.count()

# Calculate the number of valid (non-null and non-garbage) values for each column
fill_rate_report = (
    hourly_data_2010_2019.select(
        *[
            (count(when(col(column).isNotNull() & (col(column) != ''), column)) / total_records * 100)
            .alias(f"{column}_fill_rate")
            for column in hourly_data_2010_2019.columns
        ]
    )
)

# Show the fill rate report
fill_rate_report.show()


+-------------------+------------------+------------------------+------------------------------+----------------------+------------------------------+-----------------------+--------------+------------------+--------------------+----------------------+----------------------+--------------------------+---------------------+-------------------------+-------------------------+--------------------------+------------------------------------+---------------------------------+------------------------+-------------------------+----------------------------+-----------------------------+------------------------+-----------------------------------+------------------------------------+--------------------------------------+---------------------------------------+--------------------------------+---------------------------------+-----------------------------------+------------------------------------+-----------------------------+--------------------------+---------------------------+--------------

In [35]:
hourly_data_2010_2019_cleaned = hourly_data_2010_2019.dropna()

# Remove rows with empty values (if any column is an empty string)
hourly_data_2010_2019_cleaned = hourly_data_2010_2019_cleaned.filter(
    (F.col("city_name") != "") &
    (F.col("datetime") != "") &
    (F.col("temperature_2m") != "") &
    (F.col("relative_humidity_2m") != "") &
    (F.col("dew_point_2m") != "") &
    (F.col("apparent_temperature") != "") &
    (F.col("precipitation") != "") &
    (F.col("rain") != "") &
    (F.col("snowfall") != "") &
    (F.col("snow_depth") != "") &
    (F.col("weather_code") != "") &
    (F.col("pressure_msl") != "") &
    (F.col("surface_pressure") != "") &
    (F.col("cloud_cover") != "") &
    (F.col("cloud_cover_low") != "") &
    (F.col("cloud_cover_mid") != "") &
    (F.col("cloud_cover_high") != "") &
    (F.col("et0_fao_evapotranspiration") != "") &
    (F.col("vapour_pressure_deficit") != "") &
    (F.col("wind_speed_10m") != "") &
    (F.col("wind_speed_100m") != "") &
    (F.col("wind_direction_10m") != "") &
    (F.col("wind_direction_100m") != "") &
    (F.col("wind_gusts_10m") != "") &
    (F.col("soil_temperature_0_to_7cm") != "") &
    (F.col("soil_temperature_7_to_28cm") != "") &
    (F.col("soil_temperature_28_to_100cm") != "") &
    (F.col("soil_temperature_100_to_255cm") != "") &
    (F.col("soil_moisture_0_to_7cm") != "") &
    (F.col("soil_moisture_7_to_28cm") != "") &
    (F.col("soil_moisture_28_to_100cm") != "") &
    (F.col("soil_moisture_100_to_255cm") != "") &
    (F.col("shortwave_radiation") != "") &
    (F.col("direct_radiation") != "") &
    (F.col("diffuse_radiation") != "") &
    (F.col("direct_normal_irradiance") != "") &
    (F.col("global_tilted_irradiance") != "") &
    (F.col("terrestrial_radiation") != "") &
    (F.col("shortwave_radiation_instant") != "") &
    (F.col("direct_radiation_instant") != "") &
    (F.col("diffuse_radiation_instant") != "") &
    (F.col("direct_normal_irradiance_instant") != "") &
    (F.col("global_tilted_irradiance_instant") != "") &
    (F.col("terrestrial_radiation_instant") != "")
)

# Convert string values in numeric columns to actual numeric values
numeric_columns = [
    "temperature_2m", "relative_humidity_2m", "dew_point_2m",
    "apparent_temperature", "precipitation", "rain", "snowfall",
    "snow_depth", "pressure_msl", "surface_pressure", "cloud_cover",
    "cloud_cover_low", "cloud_cover_mid", "cloud_cover_high",
    "et0_fao_evapotranspiration", "vapour_pressure_deficit", "wind_speed_10m",
    "wind_speed_100m", "wind_direction_10m", "wind_direction_100m", "wind_gusts_10m",
    "soil_temperature_0_to_7cm", "soil_temperature_7_to_28cm", "soil_temperature_28_to_100cm",
    "soil_temperature_100_to_255cm", "soil_moisture_0_to_7cm", "soil_moisture_7_to_28cm",
    "soil_moisture_28_to_100cm", "soil_moisture_100_to_255cm", "shortwave_radiation",
    "direct_radiation", "diffuse_radiation", "direct_normal_irradiance",
    "global_tilted_irradiance", "terrestrial_radiation", "shortwave_radiation_instant",
    "direct_radiation_instant", "diffuse_radiation_instant", "direct_normal_irradiance_instant",
    "global_tilted_irradiance_instant", "terrestrial_radiation_instant"
]

# Convert the columns from string to float
for col_name in numeric_columns:
    hourly_data_2010_2019_cleaned = hourly_data_2010_2019_cleaned.withColumn(
        col_name, F.col(col_name).cast("float")
    )

# Remove duplicate rows
hourly_data_2010_2019_cleaned = hourly_data_2010_2019_cleaned.dropDuplicates()

# Show cleaned data
hourly_data_2010_2019_cleaned.show()

+---------+-------------------+--------------+--------------------+------------+--------------------+-------------+----+--------+----------+------------+------------+----------------+-----------+---------------+---------------+----------------+--------------------------+-----------------------+--------------+---------------+------------------+-------------------+--------------+-------------------------+--------------------------+----------------------------+-----------------------------+----------------------+-----------------------+-------------------------+--------------------------+-------------------+----------------+-----------------+------------------------+------------------------+---------------------+---------------------------+------------------------+-------------------------+--------------------------------+--------------------------------+-----------------------------+
|city_name|           datetime|temperature_2m|relative_humidity_2m|dew_point_2m|apparent_temperature|prec

In [36]:
hourly_data_2010_2019_cleaned = hourly_data_2010_2019_cleaned.filter(
    (F.col("datetime") != "00:00:00") &
    (F.to_timestamp("datetime", "yyyy-MM-dd HH:mm:ss").isNotNull())
)

# Example: Removing rows where the temperature_2m is below absolute zero (business rule)
hourly_data_2010_2019_cleaned = hourly_data_2010_2019_cleaned.filter(
    F.col("temperature_2m") >= -273.15
)

# After cleaning, show the cleaned dataset
hourly_data_2010_2019_cleaned.show()

+---------+-------------------+--------------+--------------------+------------+--------------------+-------------+----+--------+----------+------------+------------+----------------+-----------+---------------+---------------+----------------+--------------------------+-----------------------+--------------+---------------+------------------+-------------------+--------------+-------------------------+--------------------------+----------------------------+-----------------------------+----------------------+-----------------------+-------------------------+--------------------------+-------------------+----------------+-----------------+------------------------+------------------------+---------------------+---------------------------+------------------------+-------------------------+--------------------------------+--------------------------------+-----------------------------+
|city_name|           datetime|temperature_2m|relative_humidity_2m|dew_point_2m|apparent_temperature|prec

In [37]:
# Get the original count of records before cleaning
original_count = hourly_data_2010_2019.count()


cleaned_count = hourly_data_2010_2019_cleaned.count()

# Calculate cleanliness percentage
cleanliness_percentage = (cleaned_count / original_count) * 100

# Show the cleanliness percentage
print(f"Data Cleanliness Percentage: {cleanliness_percentage:.2f}%")


Data Cleanliness Percentage: 98.00%


####  I) For Ibibo website, they have a usecase where for each city they need to display best recommended timeslot (for example, 01:00) in the month (taken as an input). Best recommended timeslot is whereaverage of temperature over the years is minimal.

In [38]:
hourly_data_2010_2019 = hourly_data_2010_2019.withColumn("year", F.year(F.col("datetime"))) \
                                               .withColumn("hour", F.hour(F.col("datetime")))

# Step 2: Group by city, year, and hour, and calculate the average temperature
avg_temperature_df = hourly_data_2010_2019.groupBy("city_name", "year", "hour") \
                                           .agg(F.avg("temperature_2m").alias("avg_temperature"))

# Step 3: Find the best recommended timeslot (minimal average temperature) for each city
# For each city, select the minimum average temperature and its corresponding hour

best_timeslot_df = avg_temperature_df.withColumn("rank",
                            F.row_number().over(Window.partitionBy("city_name").orderBy("avg_temperature"))) \
                            .filter(F.col("rank") == 1) \
                            .drop("rank")

# Step 4: Show the results
best_timeslot_df.show()

+------------+----+----+------------------+
|   city_name|year|hour|   avg_temperature|
+------------+----+----+------------------+
|   Abu Dhabi|2013|   2| 22.95972602739726|
|   Amsterdam|2010|   4| 7.049863013698625|
|      Athens|2011|   4| 13.45287671232875|
|     Atlanta|2014|  11|11.524109589041096|
|    Auckland|2012|  17|12.936338797814212|
|      Austin|2014|  12|12.277260273972606|
|     Bangkok|2011|  23| 23.78191780821918|
|   Barcelona|2019|   5|12.643561643835634|
|     Beijing|2010|  22| 6.852328767123295|
|      Berlin|2010|   4| 5.457260273972602|
|      Bilbao|2017|   5|11.470410958904104|
|  Birmingham|2010|   5| 5.676438356164383|
|      Bogota|2018|  11| 8.826849315068493|
|      Boston|2010|  11|14.360821917808217|
|    Brisbane|2018|  19|15.965479452054812|
|    Brussels|2010|   4| 6.713150684931508|
|   Bucharest|2011|   3| 7.418904109589038|
|    Budapest|2010|   4| 7.152054794520543|
|Buenos Aires|2011|   9|14.287397260273966|
|       Busan|2018|  21|11.29835

In [44]:
#  Take user input for the month (as an integer)
input_month = int(input("Enter the month (1-12): "))

#  Filter the data for the selected month
hourly_data_2010_2019 = hourly_data_2010_2019.withColumn("month", F.month(F.col("datetime")))

filtered_data = hourly_data_2010_2019.filter(F.col("month") == input_month)

# Recalculate the best timeslot for the selected month
avg_temperature_filtered_df = filtered_data.groupBy("city_name", "hour") \
                                           .agg(F.avg("temperature_2m").alias("avg_temperature"))

best_timeslot_filtered_df = avg_temperature_filtered_df.withColumn("rank",
                            F.row_number().over(Window.partitionBy("city_name").orderBy("avg_temperature"))) \
                            .filter(F.col("rank") == 1) \
                            .drop("rank")

#  Display the best-recommended timeslot for each city
print(f"Best recommended timeslots for month {input_month}:")
best_timeslot_filtered_df.show()


Enter the month (1-12):  4


Best recommended timeslots for month 4:
+------------+----+------------------+
|   city_name|hour|   avg_temperature|
+------------+----+------------------+
|   Abu Dhabi|   2|22.766999999999996|
|   Amsterdam|   5| 6.273999999999995|
|      Athens|   4|11.988000000000005|
|     Atlanta|  11|11.557333333333338|
|    Auckland|  18|14.925333333333331|
|      Austin|  12|13.091000000000001|
|     Bangkok|  23|26.202333333333335|
|   Barcelona|   5|10.976666666666668|
|     Beijing|  22| 7.559666666666666|
|      Berlin|   4|             4.824|
|      Bilbao|   5|10.704000000000004|
|  Birmingham|   5| 4.527333333333331|
|      Bogota|  11|10.925333333333338|
|      Boston|  11| 14.70266666666668|
|    Brisbane|  20|17.094999999999995|
|    Brussels|   5|             5.913|
|   Bucharest|   4|  7.44333333333334|
|    Budapest|   4| 7.496000000000002|
|Buenos Aires|  10|15.487666666666668|
|       Busan|  21| 9.679333333333334|
+------------+----+------------------+
only showing top 20 rows

t=2024-12-24T16:51:23+0530 lvl=eror msg="heartbeat timeout, terminating session" obj=tunnels.session obj=csess id=d31176104590 clientid=752f240cb7c88a5a788603a08bb891a3
t=2024-12-24T16:51:23+0530 lvl=eror msg="session closed, starting reconnect loop" obj=tunnels.session obj=csess id=e53aa43dcdd6 err="session closed"


#### II) Another usecase is, the customer wants to get list of cities ordered in ascending fashion, where there is no rain for any consecutive 2 months.

In [39]:
hourly_data_2010_2019 = hourly_data_2010_2019.withColumn("year", F.year(F.col("datetime"))) \
                                               .withColumn("month", F.month(F.col("datetime")))

# Step 2: Add a 'rain' column where 1 indicates rain and 0 indicates no rain
# You can use the 'rain' column from your dataset to mark whether it rained in a particular hour
# If the total precipitation or rain in an hour is more than zero, it rained that hour.
hourly_data_2010_2019 = hourly_data_2010_2019.withColumn("rain_flag", F.when(F.col("rain") > 0, 1).otherwise(0))

# Step 3: Aggregate data by city and month, summing the rain_flags for each city and month
rain_data = hourly_data_2010_2019.groupBy("city_name", "year", "month") \
                                 .agg(F.sum("rain_flag").alias("total_rain"))

# Step 4: Filter cities where there is no rain for consecutive two months
# We will use a window function to check consecutive months for each city

# Create a window specification for each city
window_spec = Window.partitionBy("city_name").orderBy("year", "month")

# Create a column that checks the rain flag for consecutive months
rain_data_with_consecutive_check = rain_data.withColumn("next_month_rain", F.lead("total_rain").over(window_spec))

# Filter out cities where there is rain in consecutive months
valid_cities = rain_data_with_consecutive_check.filter(
    (F.col("total_rain") == 0) & (F.col("next_month_rain").isNull() | (F.col("next_month_rain") == 0))
)

# Step 5: Order the cities in ascending fashion by city name
ordered_cities = valid_cities.select("city_name").distinct().orderBy("city_name")

# Step 6: Show the results
ordered_cities.show()

+----------+
| city_name|
+----------+
| Abu Dhabi|
|    Athens|
|   Bangkok|
|   Beijing|
|    Berlin|
|Birmingham|
|  Brisbane|
| Bucharest|
|  Budapest|
|   Calgary|
|Copenhagen|
|      Doha|
|     Dubai|
|Gothenburg|
|   Hamburg|
|     Hanoi|
|  Helsinki|
| Hong Kong|
|  Istanbul|
|    Krakow|
+----------+
only showing top 20 rows



#### III) Create hive table(s) with Partitioning & Bucketing (If needed) where customer fire frequent queries like "in the year x, month y and day z, get me all the records ordered by dew_point descending"



---

## **Steps to Import and Query Data in Hive**

### **Step 1: Import Data to Hive**

We assume you have the CSV file named `hourly_data_combined_2010_to_2019.csv`.

#### **1.1 Create a Staging Table**
Create a staging table to load the raw CSV data.

```sql
CREATE TABLE hourly_data_staging (
    city_name STRING,
    datetime STRING,
    temperature_2m DOUBLE,
    relative_humidity_2m DOUBLE,
    dew_point DOUBLE,
    apparent_temperature DOUBLE,
    precipitation DOUBLE,
    weather_code STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
```

![Staging Table](./images/img1.jpeg)

#### **1.2 Load Data into the Staging Table**
Upload the CSV file to HDFS and load it into the staging table:

1. Use the following command to move the CSV file to HDFS:

   ```bash
   hdfs dfs -put /path/to/hourly_data_combined_2010_to_2019.csv /user/hive/warehouse/hourly_data_staging/
   ```

2. Load the data into Hive:

   ```sql
   LOAD DATA INPATH '/user/hive/warehouse/hourly_data_staging/hourly_data_combined_2010_to_2019.csv'
   INTO TABLE hourly_data_staging;
   ```

![Load Data](./images/img2.jpeg)

---

### **Step 2: Create a Partitioned and Bucketed Table**

#### **2.1 Create the Final Table**
Create a table partitioned by `year`, `month`, and `day` and bucketed by `city_name`.

```sql
CREATE TABLE weather_data (
    city_name STRING,
    dew_point DOUBLE,
    temperature_2m DOUBLE,
    relative_humidity_2m DOUBLE,
    apparent_temperature DOUBLE,
    precipitation DOUBLE,
    weather_code STRING
)
PARTITIONED BY (year INT, month INT, day INT)
CLUSTERED BY (city_name) INTO 10 BUCKETS
STORED AS ORC
TBLPROPERTIES ("orc.compress"="ZLIB");
```

![Final Table](./images/img3.jpeg)

---

### **Step 3: Transform and Insert Data into the Partitioned Table**

#### **3.1 Enable Dynamic Partitioning and Bucketing**
Set Hive properties for dynamic partitioning and bucketing:

```sql
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.enforce.bucketing = true;
```

![Dynamic Partitioning](./images/img4.jpeg)

#### **3.2 Insert Data into the Partitioned Table**
Transform the data from the staging table and populate the partitioned table:

```sql
INSERT OVERWRITE TABLE weather_data PARTITION (year, month, day)
SELECT
    city_name,
    dew_point,
    temperature_2m,
    relative_humidity_2m,
    apparent_temperature,
    precipitation,
    weather_code,
    YEAR(TO_DATE(datetime)) AS year,
    MONTH(TO_DATE(datetime)) AS month,
    DAY(TO_DATE(datetime)) AS day
FROM hourly_data_staging;
```

![Insert Data](./images/img5.jpeg)

---

### **Step 4: Query the Data**

#### **4.1 Query for a Specific Year, Month, and Day**
Retrieve data for a specific date and order it by `dew_point` in descending order:

```sql
SELECT *
FROM weather_data
WHERE year = 2022 AND month = 8 AND day = 15
ORDER BY dew_point DESC;
```

![Query Data](./images/img6.jpeg)

#### **4.2 Verify Partitioning**
Check the partitions created for the table:

```sql
SHOW PARTITIONS weather_data;
```

![Show Partitions](./images/img7.jpeg)

#### **4.3 Analyze the Table**
Collect statistics for bettenment.
- Use the ORC format for better compression and performance.

--- 

Let me know if there are additional corrections or enhancements you need!nt due to partitioning.
- Bucketing ensures an even distribution of `city_name` for faster access.

Let me know if you need help with any specific steps!

In [40]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install pyngrok
!pip install python-dotenv
from dotenv import load_dotenv
import os
from pyngrok import ngrok
load_dotenv()

'wget' is not recognized as an internal or external command,
operable program or batch file.
'unzip' is not recognized as an internal or external command,
operable program or batch file.




True

In [41]:
auth_token = os.getenv("AUTH_TOKEN")       ## Please enter your ngrok auth token here
ngrok.set_auth_token(auth_token)
public_url = ngrok.connect(4050)
print("Ngrok Tunnel URL:", public_url)

Ngrok Tunnel URL: NgrokTunnel: "https://57d5-2401-4900-1cbd-2842-c1fc-32e7-1bc9-42eb.ngrok-free.app" -> "http://localhost:4050"
