##### Description:
Integrate Daily Pricing Data , Geolocation Data and Weather Data to Publish in Gold Layer as a source data for Future Price Prediction Using AI

##### Source Tables:
pricing_analytics.silver.daily_pricing_silver , pricing_analytics.silver.geo_location_silver , pricing_analytics.silver.weather_data_silver

##### Target Table name : DataLake_Price-Prediction_Gold
###### Target Table Column Mappings:
| SOURCE_TABLE_NAME | SOURCE_COLUMN_NAME | DATALAKE_TABLE_NAME | DATALAKE_COLUMN_NAME | TRANSFORMATION RULE | CONDITIONS |
| --- | --- |--- | --- |--- |--- |
| silver.daily_pricing_silver	| DATE_OF_PRICING	|datalake_price_prediction_gold| DATE_OF_PRICING| Direct Mapping |  |
| silver.daily_pricing_silver	| STATE_NAME	|datalake_price_prediction_gold| STATE_NAME| Direct Mapping |  |
| silver.daily_pricing_silver	| MARKET_NAME	|datalake_price_prediction_gold|  MARKET_NAME	| Direct Mapping |  |
| silver.daily_pricing_silver	| PRODUCTGROUP_NAME |datalake_price_prediction_gold| PRODUCT_ID| Direct Mapping |  |
| silver.daily_pricing_silver	| PRODUCT_NAME	|datalake_price_prediction_gold| PRODUCT_ID| Direct Mapping |  |
| silver.daily_pricing_silver	| VARIETY	|datalake_price_prediction_gold| VARIETY_ID|Direct Mapping |  |
| silver.daily_pricing_silver	| ROW_ID	|datalake_price_prediction_gold| ROW_ID| Direct Mapping |  |
| silver.daily_pricing_silver	| ARRIVAL_IN_TONNES	|datalake_price_prediction_gold| ARRIVAL_IN_TONNES| Direct Mapping |  |
| silver.daily_pricing_silver	| MINIMUM_PRICE	|datalake_price_prediction_gold| MINIMUM_PRICE| Direct Mapping |  |
| silver.daily_pricing_silver	| MAXIMUM_PRICE	|datalake_price_prediction_gold| MAXIMUM_PRICE| Direct Mapping |  |
| silver.daily_pricing_silver	| MODAL_PRICE	|datalake_price_prediction_gold| MODAL_PRICE| Direct Mapping |  |
| silver.geo_location_silver 	| latitude	|datalake_price_prediction_gold| MARKET_LATITUDE| Change the Source Column Name | daily_pricing_silver.STATE_NAME = geo_location_silver.stateName AND daily_pricing_silver.MARKET_NAME = geo_location_silver.marketName AND geo_location_silver.countryName = 'India' |
| silver.geo_location_silver 	| longitude	|datalake_price_prediction_gold| MARKET_LONGITUDE| Change the Source Column Name |  |
| silver.geo_location_silver 	| population	|datalake_price_prediction_gold| MARKET_POPULATION| Change the Source Column Name  |  |
| silver.weather_data_silver 	| unitOfTemparature	|datalake_price_prediction_gold| TEMPARATURE_UNIT| Change the Source Column Name  | daily_pricing_silver.MARKET_NAME = weather_data_silver.marketName AND daily_pricing_silver.DATE_OF_PRICING = weather_data_silver.weatherDate |
| silver.weather_data_silver 	| maximumTemparature	|datalake_price_prediction_gold| MARKET_MAX_TEMPARATURE | Change the Source Column Name  |  |
| silver.weather_data_silver 	| minimumTemparature	|datalake_price_prediction_gold| MARKET_MIN_TEMPARATURE | Change the Source Column Name  |  |
| silver.weather_data_silver 	| unitOfRainFall	|datalake_price_prediction_gold| RAINFALL_UNIT| Change the Source Column Name  |  |
| silver.weather_data_silver 	| rainFall	|datalake_price_prediction_gold| MARKET_DAILY_RAINFALL| Change the Source Column Name  |  |
| DERIVED	| DERIVED	|datalake_price_prediction_gold	| lakehouse_inserted_date	| Load current_timestamp() | |
| DERIVED	| DERIVED	|datalake_price_prediction_gold	| lakehouse_updated_date	| Load current_timestamp() | |


- <a href="https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html" target="_blank">**MERGE TABLE** </a>

In [0]:
from pyspark.sql.functions import * 

In [0]:
dailyPricingDF = spark.sql("select * from pricing_analytics.silver.daily_pricing_silver")

In [0]:
dailyPricingOUT = dailyPricingDF.select(
                                        col("STATE_NAME"),
                                        col("MARKET_NAME"),
                                        col("PRODUCTGROUP_NAME"),
                                        col("PRODUCT_NAME"),
                                        col("VARIETY"),
                                        col("ROW_ID"),
                                        col("MINIMUM_PRICE"),
                                        col("MAXIMUM_PRICE"),
                                        col("ARRIVAL_IN_TONNES"),
                                        col("MODAL_PRICE"),
                                        col("DATE_OF_PRICING")
                                         )


In [0]:
display(dailyPricingOUT)

In [0]:
geoLocationDF = spark.sql("select * from pricing_analytics.silver.geo_location_silver")

In [0]:
geoLocationOUT = geoLocationDF.select( col("latitude").alias("MARKET_LATITUDE"),
                                        col("longitude").alias("MARKET_LONGITUDE"),
                                        col("population").alias("MARKET_POPULATION"),
                                        col("stateName"),
                                        col("marketName"),
                                        col("countryName")
                                     )

In [0]:
display(geoLocationOUT)

In [0]:
weatherDataDF = spark.sql("select * from pricing_analytics.silver.weather_data_silver")

In [0]:
weatherDataOUT = (weatherDataDF.select( col("unitOfTemperature").alias("TEMPARATURE_UNIT"),
                                        col("maximumTemparature").alias("MARKET_MAX_TEMPARATURE"),
                                        col("minimumTemparature").alias("MARKET_MIN_TEMPARATURE"),
                                        col("unitOfRainFall").alias("RAINFALL_UNIT"),
                                        col("rainFall").alias("MARKET_DAILY_RAINFALL"),
                                        col("marketName"),
                                        col("weatherDate")
                                      ).withColumn("lakehouse_inserted_date", current_timestamp())
                                       .withColumn("lakehouse_updated_date", current_timestamp()) )


In [0]:
display(weatherDataOUT)

In [0]:
finalDF=(dailyPricingOUT.join(geoLocationOUT,
                      (dailyPricingOUT.MARKET_NAME == geoLocationOUT.marketName) & (dailyPricingOUT.STATE_NAME == geoLocationOUT.stateName) & (geoLocationOUT.countryName == 'India'), 'inner')
                      .join(weatherDataOUT, (dailyPricingOUT.MARKET_NAME == weatherDataOUT.marketName) & (dailyPricingOUT.DATE_OF_PRICING == weatherDataOUT.weatherDate) , 'inner').drop("stateName","countryName","marketName","weatherDate")
                     )

finalDF.write.mode("overwrite").saveAsTable("pricing_analytics.gold.PRICE_PREDICTION_GOLD")



##### Step 1: Create and Load Gold Layer Price Prediction Table

1. SELECT the source columns mentioned above from the source table pricing_analytics.silver.daily_pricing_silver
1. SELECT the source columns mentioned above from the source table pricing_analytics.silver.geo_location_silver and changes source column names to target column names as mentioned above mapping
1. Include JOIN conditions between pricing_analytics.silver.daily_pricing_silver table and pricing_analytics.silver.geo_location_silver Using the Join conditions mentioned in above mapping
1. SELECT the source columns mentioned above from the source table pricing_analytics.silver.weather_data_silver  and changes source column names to target column names as mentioned above mapping
1. Include JOIN conditions between pricing_analytics.silver.daily_pricing_silver table and pricing_analytics.silver.weather_data_silver  Using the Join conditions mentioned in above mapping
1. Map current_timestamp() function to additional new columns lakehouse_inserted_date and lakehouse_updated_date
1. CREATE the target table to store the output of SELECT statement to publish the transformed data.

##### Step 2: Test The Data Stored in Gold Layer Table And Highlight Any Data Quality Issues

1. Write SELECT query to select the data from pricing_analytics.gold.DataLake_Price-Prediction_Gold table
1. Check the data for any one of the Market Name and make sure there are no data quality issues
1. Raise any Data Quality Issuesin Target Table  as a Query in Udemy Course