## Dataset 2: Extract weather dataset from open weather map API using asynchronous processing, load the data into mongo db and perform pre-processing, cleansing and transformation and load the structured data into postgresql

### @Author: Manoj
### @Student Id: x20179189


# Import variables for the data

In [16]:
# import variables
import json
import urllib
import requests
import pymongo
import pprint
import pandas as pd
from pandas import json_normalize
from requests import ConnectionError
import json
import os
import time
import aiohttp
import asyncio
import time
import datetime
from sqlalchemy import create_engine
import sqlalchemy
from waiting import wait
weatherDf = pd.DataFrame()

# Extract data from open weather map api only if collection does not exist in mongo

In [17]:
def checkIfCollectionExists(mongodb, weatherCollectionName):
    exist_col = mongodb.list_collection_names()
    col_exists = False
    if weatherCollectionName in exist_col:
        col_exists = True
    return col_exists

In [19]:
def dropCollectionIfExists(mongodb, weatherCollectionName):
    exist_col = mongodb.list_collection_names()
    if weatherCollectionName in exist_col:
        new_col = mongodb[weatherCollectionName]
        new_col.drop()

# Read the city dataset from mongodb and use the latitude & longitude of each city to get the forecasted weather and fetch the forecasted weather asynchronously and insert the values to weather collectin in mongodb

In [20]:
# makes api calls to open weather map api to fetch forecasted weather details
async def get_weather_data(session, url):
    currentTime = datetime.datetime.now()
    weatherData = {}
 
    try:
        async with session.get(url) as resp:
            weatherData = await resp.json()

    except aiohttp.ClientConnectionError:
        print("connection error")
        session.close()
        await asyncio.sleep(20)
        session = aiohttp.ClientSession()

        async with session.get(url) as resp:
            weatherData = await resp.json()
    except aiohttp.ClientError:
        print("connection error")
        session.close()
        await asyncio.sleep(20)
        session = aiohttp.ClientSession()

        async with session.get(url) as resp:
            weatherData = await resp.json()
    except aiohttp.ClientPayloadError:
        print("payload error")
        session.close()
        await asyncio.sleep(20)
        session = aiohttp.ClientSession()
        async with session.get(url) as resp:
            weatherData = await resp.json()
    except requests.Timeout as err:
        print("payload error")
        session.close()
        await asyncio.sleep(20)
        session = aiohttp.ClientSession()
        async with session.get(url) as resp:
            weatherData = await resp.json()
    except requests.RequestException as err:
        print("payload error")
        await asyncio.sleep(20)
        session.close()

        session = aiohttp.ClientSession()

        async with session.get(url) as resp:
            weatherData = await resp.json()
    except (aiohttp.ClientResponseError,
                aiohttp.ClientOSError,
                aiohttp.client.ClientSession) as exc:
        session.close()

        await asyncio.sleep(20)
        session = aiohttp.ClientSession()
        async with session.get(url) as resp:
            weatherData = await resp.json()
        print("server disconnected error")
    except:
        await asyncio.sleep(20)
        session.close()
        session = aiohttp.ClientSession()
        async with session.get(url) as resp:
            weatherData = await resp.json()
        print("server disconnected error")
    # handle other errors
    return weatherData


# fetches forecasted weather data for cities from get_weather_data method
# and inserts the records into mongodb
async def save_weather_data(session, dynamicUrl, cityId, weatherCollection, connection_timeout):
    try:
        current_req_start_time = time.time()
        weatherData = await get_weather_data(session, dynamicUrl)
        weatherData['cityId'] = cityId
        x = weatherCollection.insert_one(weatherData)
    except ConnectionError:
        if time.time() > current_req_start_time + connection_timeout:
            raise Exception('Unable to get updates after {} seconds of ConnectionErrors'.format(connection_timeout))
        else:
            await asyncio.sleep(5)
    except OSError as error : 
        if time.time() > current_req_start_time + connection_timeout:
            raise Exception('Unable to get updates after {} seconds of ConnectionErrors'.format(connection_timeout))
        else:
            await asyncio.sleep(5)
    return weatherData

# sets up asynchronous processing and makes use of multiple api keys
# to make api calls parallely without waiting for the api responses in sequence
async def extract_weather_data(session, apiKeys, cities, weatherMapUrl, weatherCollection, connection_timeout):
    sem = asyncio.Semaphore(5)
    initialCount = 0
    session = aiohttp.ClientSession()
    async with sem:
        startTime = datetime.datetime.now()
        async with session:
            tasks = []
            for city in cities.find():
                apiKey = apiKeys[initialCount % 3]
                initialCount += 1 
                dynamicUrl = weatherMapUrl.format(latitude = city['location']['latitude'], longitude = city['location']['longitude'], apiKey = apiKey)
                tasks.append(asyncio.ensure_future(save_weather_data(session, dynamicUrl, city['cityId'], weatherCollection, connection_timeout)))
            await asyncio.gather(*tasks)

# Return list of weathers containing forecasted weather data for each city

In [11]:
def fetchWeatherCollectionAsList(weatherCollection): 
    return list(weatherCollection.find({}))

# Iterate the parent weather json over a pandas dataframe to normalize an nested array containing forecasted weather of each day and slice results for only the first 5 forecasted days and drop unwanted columns

In [None]:
def preprocessingData(weathersOfCities, initialWeatherDf):
    global weatherDf
    for weather in weathersOfCities:
        normalized_daily_array = json_normalize(weather['daily'])
        normalized_daily_array = normalized_daily_array.explode('weather')
        for index, j in normalized_daily_array.iterrows():
            if index > 4:
                break
            new_frame = json_normalize(j['weather'])
            new_frame = new_frame.iloc[0]
            j['id'] = j['weather']['id']
            j['main'] = j['weather']['main']
            j['icon'] = j['weather']['icon']
            j['description'] = j['weather']['description']
            j['temp'] = j['temp.day']
            j['feels_like'] = j['feels_like.day']
            j['latitude'] = weather['lat']
            j['longitude'] = weather['lon']
            j['cityId'] = weather['cityId']
            j.drop(['weather', 'feels_like.day', 'feels_like.night', 'feels_like.eve', 'feels_like.morn', 'moon_phase', 'moonset', 'moonrise', 'temp.day', 'temp.night', 'temp.eve','temp.morn', 'temp.min', 'temp.max'], inplace=True)
            j["date"] = datetime.date.fromtimestamp(j['dt']).isoformat()
            initialWeatherDf = initialWeatherDf.append(j, ignore_index=True)
        normalize_value_1 = json_normalize(weather)
        normalize_value1 = normalize_value_1.loc[:,['cityId', 'lat','lon']]
    weatherDf = initialWeatherDf
    return weatherDf
                                              


# Describe the info about the transformed pandas dataframe and fill null values for rain and snow with zero & convert temparature from kelvin into degree celsius

In [1]:
def performDataCleaning(weatherDataframe):
    global weatherDf
    weatherDataframe.shape
    weatherDataframe.dtypes
    weatherDataframe.describe()
    weatherDataframe.info()
    weatherDataframe.isnull().sum()
    weatherDataframe.nunique()
    weatherDataframe.duplicated()
    
    weatherDataframe['rain'] = weatherDf['rain'].fillna(0)
    weatherDataframe['snow'] = weatherDf['snow'].fillna(0)
    weatherDataframe['dew_point'] = weatherDf['dew_point'].fillna(0)
    weatherDataframe['pop'] = weatherDf['pop'].fillna(0)
    weatherDataframe['temp'] = weatherDataframe['temp'] - 273

    weatherDf = weatherDataframe
    return weatherDf
    

# Drop unwanted columns in transformed weather dataframe and rename the columns in the dataframe

In [4]:
def dropAndRenameColumnsWeatherDf(weatherDataframe):
    global weatherDf

    a = weatherDataframe.head()
    dropColumns = []
    columnNames = ['wind_gust', 'icon', 'visibility']
    for name in columnNames:
        if name in weatherDataframe.columns:
            dropColumns.append(name)
    print('dropColumns', dropColumns)
    weatherDataframe.drop(dropColumns, axis=1, inplace= True)
    weatherDataframe.rename(columns={'id': 'weather_code', 'main': 'weather_title', 'description':'weather_desc', 'cityId': 'city_id', 'dt': 'date_timestamp', 'temp': 'temperature'}, inplace=True)
    weatherDf = weatherDataframe
    return weatherDf


# Convert the values in the dataframe to appropriate datatypes such as int, float and date and split the month, year and day as separate columns

In [3]:
def convertWeatherDfDataTypes(weatherDataframe):
    global weatherDf
    if weatherDataframe.empty != True:
        weatherDataframe['date'] = pd.to_datetime(weatherDataframe['date'])
        weatherDataframe['city_id'] = weatherDataframe['city_id'].astype('int64')
        weatherDataframe['weather_code'] = weatherDataframe['weather_code'].astype('int64')
        weatherDataframe['latitude'] = weatherDataframe['weather_code'].astype(float)
        weatherDataframe['longitude'] = weatherDataframe['longitude'].astype(float)
        weatherDataframe['wind_deg'] = weatherDataframe['wind_deg'].astype(float)
        weatherDataframe['wind_speed'] = weatherDataframe['wind_speed'].astype(float)
        weatherDataframe['clouds'] = weatherDataframe['clouds'].astype(float)
        weatherDataframe['uvi'] = weatherDataframe['uvi'].astype(float)
        weatherDataframe['dew_point'] = weatherDataframe['dew_point'].astype(float)
        weatherDataframe['humidity'] = weatherDataframe['humidity'].astype(float)
        weatherDataframe['pressure'] = weatherDataframe['pressure'].astype(float)
        weatherDataframe['feels_like'] = weatherDataframe['feels_like'].astype(float)
        weatherDataframe['temperature'] = weatherDataframe['temperature'].astype(float)
        weatherDataframe['sunset'] = weatherDataframe['sunset'].astype('int64')
        weatherDataframe['sunrise'] = weatherDataframe['sunrise'].astype('int64')
        weatherDataframe['date_timestamp'] = weatherDataframe['date_timestamp'].astype('int64')
            
        weatherDataframe['year'] = weatherDf['date'].dt.year
        weatherDataframe['month'] = weatherDf['date'].dt.month
        weatherDataframe['day'] = weatherDf['date'].dt.day
        weatherDataframe['date']= weatherDataframe['date'].dt.strftime('%Y-%m-%d')

        weatherDf = weatherDataframe
        return weatherDf

# Upload the final weather dataframe to postgres database

In [22]:
def uploadWeatherDataToPostgres(postgresClient, weatherDf, collectionName):
    try:
        weatherDf.to_sql(collectionName, postgresClient,if_exists='replace',method='multi')
    except Exception as e:
        print("Error: ",e)


# Main method of this file encompassing method calls to all other methods in this file

In [23]:
async def weather_main(postgresClient, mongoClient, isWeatherCollectionExists):
    # starting point of execution for this notebook
    # this method encompasses calls to all other methods wihtin this notebook
    try:
        db = mongoClient[databaseName]
        cities = db[cityTableName]
        weatherCollection = db[weatherTableName]
        collectionName = weatherTableName
        weatherMapUrl = "https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,hourly&appid={apiKey}"
        connection_timeout = 30 # seconds
        count = 0
        start_time = time.time()
        initialCount = 0
        START_TIME = datetime.datetime.now()
        start_time = time.time()
        apiKeys = ['4060c67d06c701fa7b6fb2c075ce7b3a','f7b57c204082040afc460167c2a07941', '3af487e9bfe75ef34b67965653063c47']


        if isWeatherCollectionExists == False:
    #     dropCollectionIfExists(db, collectionName)
            await extract_weather_data(apiKeys, cities, weatherMapUrl, weatherCollection, connection_timeout)
            print('Extracted weather data from mongo ----')
        global weatherDf
        weathersOfCities = fetchWeatherCollectionAsList(weatherCollection)
        print('Fetched weathers of cities --')

        finalWeatherDf = pd.DataFrame(weathersOfCities)
        weatherDf = finalWeatherDf
        processedWeatherDf = preprocessingData(weathersOfCities, pd.DataFrame())
        print('Processed weather dataframe ----')

        finalWeatherDf = performDataCleaning(processedWeatherDf)
        transformedWeatherDf = dropAndRenameColumnsWeatherDf(finalWeatherDf)
        convertedWeatherDf = convertWeatherDfDataTypes(transformedWeatherDf)
        uploadWeatherDataToPostgres(postgresClient, convertedWeatherDf, weatherTableName)
        print('weather data processing done')
    except Exception as error:
        print('Error while processing weather dataframe')
