In [None]:
import requests
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import os
from dotenv import load_dotenv

In [None]:
load_dotenv

aws_access_key = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_key= os.getenv('AWS_SECRET_ACCESS_KEY')

In [None]:
def get_weather_api():
    url = "http://api.weatherstack.com/current"
    querystring = {"access_key" : weather_access_key, "query": "London"}

    try:
        response = requests.get(url, params=querystring)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        return None


In [None]:
def create_dataframe():
    data = [[
        (json_response["location"]["localtime"]), 
        (json_response["current"]["observation_time"]), 
        (json_response["current"]["temperature"]), 
        (json_response["current"]["weather_descriptions"][0]), 
        (json_response["current"]["wind_speed"]), 
        (json_response["current"]["wind_degree"]), 
        (json_response["current"]["precip"]), 
        (json_response["current"]["humidity"]), 
        (json_response["current"]["cloudcover"]), 
        (json_response["current"]["feelslike"]), 
        (json_response["current"]["uv_index"]), 
        (json_response["current"]["visibility"])
        ]]
    
    columns = StructType([
    StructField("local_time", StringType(), True),
    StructField("observation_time", StringType(), True),
    StructField("temperature", IntegerType(), True),
    StructField("weather_description", StringType(), True),
    StructField("wind_speed", IntegerType(), True),
    StructField("wind_degree", IntegerType(), True),
    StructField("precipitation", IntegerType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("cloudcover", IntegerType(), True),
    StructField("feelslike", IntegerType(), True),
    StructField("uv_index", IntegerType(), True),
    StructField("visibility", IntegerType(), True)
    ])
    
    weather_data = spark.createDataFrame(data, schema = columns)

    return weather_data


In [None]:
def standardise_weather_df_columns():
    """ 
    standardises the weather_df
    - Splits 'local_time' into 'localDate' and 'localTime' columns.
    - Converts 'observation_time' from 12-hour format to 24-hour format.
    - Converts 'wind_speed' from km/h to m/h.

    Parameters
    pyspark.sql.DataFrame:  weather_df

    Returns 
    pyspark.sql.DataFrame: The transformed weather DataFrame.
    """

    #split local_time to date and time columns
    weather_data = weather_data.withColumn("localDate", F.split(weather_data["local_time"], "\s+").getItem(0))
    weather_data = weather_data.withColumn("localTime", F.split(weather_data["local_time"], "\s+").getItem(1))
    weather_data = weather_data.drop("localDate")

    #change observation time from 12 hours to 24 hours
    weather_data = weather_data.withColumn("observation_time", 
                                       F.date_format(F.to_timestamp ("observation_time", "hh:mm a"), "HH:mm"))
    
    #converts the wind_speed from km/h to m/h 
    weather_data = weather_data.withColumn("wind_speed", weather_data["wind_speed"]/1.609)
    


In [None]:
def upload_to_bucket(weather_data):

    if weather_data is not None:
        s3_client = boto3.client('s3',
                         aws_access_key_id = aws_access_key,
                         aws_secret_access_key =aws_secret_key,
                         region_name='eu-west-2'
                        )
    
        bucket = "weather_data"
        object = "raw_weather_data"

        try:
            s3_client.put_object(
                Bucket = bucket,
                Key = object,
                Body = weather_data.encode('utf-8')
                ContentType = 'application/json'
            )
        except PartialCredentialsError:
            print("Partial credentials provided, please check your access key and secret key")
        except NoCredentialsError:
            print("Credentials not available or incorrect")
        except Exception as e:
            print(f"Upload to s3://weather_data/raw_weather_data failed: {e}")


    else:
        print("No weather data to upload.")
        return 

In [None]:
def fetch_and_process_data():
    weather_response = get_weather_api()
    if weather_response is not None:
        weather_df = create_dataframe(weather_response)
        upload_to_bucket(weather_response)
        transformed_weather_df = standardise_weather_df_columns(weather_df)
    return transformed_weather_df


    
