#### Gold Layer Processing - Weather Event API

In [19]:
from pyspark.sql.functions import col, udf, when, year, month, dayofmonth, hour, to_date
from pyspark.sql.types import StringType, TimestampType, StructType, StructField  # Hinzugefügt
import reverse_geocoder as rg

df = spark.read.table("weather_data_silver")

StatementMeta(, 90da515d-b47a-4476-abf2-bcea032c1ffb, 23, Finished, Available, Finished)

StatementMeta(, 90da515d-b47a-4476-abf2-bcea032c1ffb, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f848a732-6dca-489b-8036-9acc4a3038c6)

In [20]:
def get_location_details(lat, lon):
    """
    Retrieve country code and city name for a given latitude and longitude.

    Parameters:
    lat (float): Latitude of the location.
    lon (float): Longitude of the location.

    Returns:
    tuple: (country_code, city_name)

    Example:
    >>> get_location_details(52.52, 13.419998)
    ('DE', 'Berlin')
    """
    coordinates = (float(lat), float(lon))
    result = rg.search(coordinates)[0]
    return (result.get('cc'), result.get('name'))


StatementMeta(, 90da515d-b47a-4476-abf2-bcea032c1ffb, 24, Finished, Available, Finished)

In [21]:
# UDF registrieren
get_location_udf = udf(get_location_details, StructType([
    StructField("country_code", StringType(), False),
    StructField("city", StringType(), False)
]))

StatementMeta(, 90da515d-b47a-4476-abf2-bcea032c1ffb, 25, Finished, Available, Finished)

In [26]:
df_with_location = df.withColumn("location", get_location_udf(col("latitude"), col("longitude")))\
 .select(
    "elevation",
    "generationtime_ms",
    "latitude",
    "longitude",
    "timezone",
    "temperature_unit",
    "time",
    "temperature",
    col("location.country_code").alias("country"),
    col("location.city").alias("city")
 )

StatementMeta(, 90da515d-b47a-4476-abf2-bcea032c1ffb, 30, Finished, Available, Finished)

In [28]:
df_with_temperature_class = df_with_location.withColumn(
    "temperature_class", when(col("temperature") < 5, "Cold")\
   .when(col("temperature") < 14, "Modest")\
   .when(col("temperature") < 21, "Warm")\
   .otherwise("Hot")
   )

StatementMeta(, 90da515d-b47a-4476-abf2-bcea032c1ffb, 32, Finished, Available, Finished)

In [30]:
df_gold = df_with_temperature_class.withColumn("date", to_date(col("time")))\
                                    .withColumn("year", year(col("time")))\
                                    .withColumn("month", month(col("time")))\
                                    .withColumn("day", dayofmonth(col("time")))\
                                    .withColumn("hour", hour(col("time")))
                    

StatementMeta(, 90da515d-b47a-4476-abf2-bcea032c1ffb, 34, Finished, Available, Finished)

In [33]:
df_gold.write.mode("overwrite").saveAsTable("weather_data_gold")

StatementMeta(, 90da515d-b47a-4476-abf2-bcea032c1ffb, 37, Finished, Available, Finished)