In [1]:
! pip install geopy

Collecting geopy
  Using cached geopy-1.12.0-py2.py3-none-any.whl
Installing collected packages: geopy
Successfully installed geopy-1.12.0
[33mYou are using pip version 9.0.1, however version 9.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
import pandas as pd
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings("ignore")
from geopy.distance import vincenty
import numpy as np
import pyspark
from pyspark.sql import functions as f, SparkSession,SQLContext
from pyspark.sql.types import StructType, StructField, StringType

In [3]:
sc = pyspark.SparkContext('local[*]')
spark      =  SparkSession(sc)

In [11]:
def cal_conditions(row):
    #Function to caluculate and predict the day as Rain, Snow, Sunny, etc
    if row['rainfall_mm'] >= 1.5:
        return 'Rain'
    elif row['minimum_temperature'] <= 0 and row['maximum_temperature'] <= 10:
        return 'Snow' 
    elif row['maximum_temperature'] >= 30 and row['minimum_temperature'] <= 25:
        return 'Sunny' 
    else:
        return 'Moderate'

def date_rolling_range(no_of_days,date_to_predict):
    # Returns a list of dates with specific format
    return [(datetime.strptime(date_to_predict, '%Y-%m-%d') - timedelta(days=365) - timedelta(days=no_of_days)).strftime('%Y-%m-%d'), 
            (datetime.strptime(date_to_predict, '%Y-%m-%d') - timedelta(days=365) + timedelta(days=no_of_days)).strftime('%Y-%m-%d')]

def mean_per_date_station(df,date_range, in_station_id):
    # return mean per station id
    filter_range = (df.station_id == in_station_id)&(df.date_formated >= date_range[0]) & (df.date_formated <= date_range[1])
    return df.loc[filter_range].groupby(['station_id']).mean()

def predict_weather(station_id, date_to_predict):
    # Used Grand Mean method to predict weather
    # https://en.wikipedia.org/wiki/Grand_mean
    history_weather = pd.read_csv("/home/jovyan/data/weather_extract.csv", index_col="date")
    df_station_list = history_weather[['station_id','station_name']].groupby(['station_id','station_name']).count()
    history_weather['date_formated']=history_weather.index
    history_weather.date_formated = history_weather.date_formated.apply(lambda x: datetime.strptime(x, '%Y-%m-%d'))
    union_df = pd.concat([mean_per_date_station(history_weather,date_rolling_range(3,date_to_predict),station_id)
                          ,mean_per_date_station(history_weather,date_rolling_range(15,date_to_predict),station_id)
                          ,mean_per_date_station(history_weather,date_rolling_range(30,date_to_predict),station_id)])
    final_prediction = union_df.groupby(union_df.index).mean()
    final_prediction_pred =  final_prediction[['Lat', 'Lon','rainfall_mm','3pm_relative_humidity','3pm_msl_pressure_hpa', 'minimum_temperature', 'maximum_temperature']]
    final_prediction_pred["Conditions"] = final_prediction_pred.apply(cal_conditions,axis=1)
    final_prediction_pred["predicted_date"] = date_to_predict
    final_prediction_pred.join(df_station_list)
    df_station_list.to_csv("staion_id.csv")
    with open('/home/jovyan/data/prediction_weather.psv', 'a') as f:
        final_prediction_pred.to_csv(f, header=False, sep='|')

In [12]:
# Function all runs the predict_weather function from provided date and number of days
start_time = datetime.now()
strat_date = '2018-01-01'
no_of_days = 365
station_id_list = [3003, 9021, 9999, 12038,18192,23090,24048,26021,31011]
date_list = [(datetime.strptime(strat_date, '%Y-%m-%d') + timedelta(days=x)).strftime('%Y-%m-%d')
             for x in range(0, no_of_days)]
for day in date_list:
    for station in station_id_list:
        predict_weather(station, day)
        #print (("Predicting Weather for the date - {0} and weather station id - {1}").format(day, station))
end_time = datetime.now()  
time_delta = end_time - start_time

In [13]:
print("Time taken to run the prediction - " + str(timedelta(seconds=time_delta.seconds)))

Time taken to run the prediction - 0:09:30


In [14]:
#Geospatial calculations

In [15]:
df_airport = pd.read_csv("/home/jovyan/data/aus_airport.csv")
col = ['staion_id','Lat', 'Lon','rainfall_mm','3pm_relative_humidity','3pm_msl_pressure_hpa', 'minimum_temperature', 'maximum_temperature', 'Condition', 'date']
prediction_weather_staion_id = pd.read_csv("/home/jovyan/data/prediction_weather.psv", header=None,sep='|')
prediction_weather_staion_id.columns = col
df_prediction_weather_staion = spark.createDataFrame(prediction_weather_staion_id)
weather_stations = prediction_weather_staion_id[['staion_id','Lat', 'Lon']].drop_duplicates()
df_prediction_weather_staion.registerTempTable("df_prediction_weather_staion")

In [16]:
weather_stations['station_coords'] = tuple(zip(weather_stations['Lat'], weather_stations['Lon']))
df_airport['airport_coords'] = tuple(zip(df_airport['Latitude'],df_airport['Longitude']))
weather_stations.reset_index(drop=True)
weather_stations.index = range(len(weather_stations.index))


In [17]:
my_lst_station_airport = []
for i in df_airport.index.values:
    for j in weather_stations.index.values:
        my_lst_station_airport.append([weather_stations.iloc[j][0] , 
               df_airport.iloc[i][0] , 
               df_airport.iloc[i][1] ,
               (vincenty(weather_stations.iloc[j][3], df_airport.iloc[i][4]).km)])

df_station_airport = pd.DataFrame(my_lst_station_airport)
df_station_airport.columns = ['station_id', 'Airport_name', 'IATA_code','Distance']

In [18]:
df_1 = spark.createDataFrame(df_station_airport)
df_1.registerTempTable("df_1")
df_2 = spark.sql("""
SELECT 
station_id,
Airport_name,
IATA_code,Distance,
ROW_NUMBER() OVER (PARTITION BY 
                            IATA_code
                            ORDER BY 
                            Distance) AS rnA
FROM 
df_1
""")
df_3 = df_2.filter("rnA =1")
df_3.registerTempTable("df_3")

In [19]:
final_psv_output = spark.sql("""
SELECT 
 IATA_code as Station
,Lat
,Lon
,date as  Local_Time
,Condition
,maximum_temperature as Temperature
,3pm_msl_pressure_hpa as Pressure 
,3pm_relative_humidity as Humidity

FROM 
df_3 A Inner join df_prediction_weather_staion B on a.station_id = B.staion_id
order by date
""")


In [20]:
final_psv_output.write.csv('data/IATA_weather_prediction.psv',sep = '|', mode="overwrite")