In [1]:
import os
# set output path
# os.chdir("")

In [2]:
import errno
import time
import hashlib
from multiprocessing.pool import ThreadPool
from typing import Callable
import numpy as np
import pandas as pd


class Core:

    """
    Base class that provides features which are used across the package
    """

    # Base URL of the Meteostat bulk data interface
    endpoint: str = 'https://bulk.meteostat.net/v2/'

    # Location of the cache directory
    cache_dir: str = os.path.expanduser(
        '~') + os.sep + '.meteostat' + os.sep + 'cache'

    # Maximum age of a cached file in seconds
    max_age: int = 24 * 60 * 60

    # Maximum number of threads used for downloading files
    max_threads: int = 1

    def _get_file_path(
        self,
        subdir: str,
        path: str
    ) -> str:
        """
        Get the local file path
        """

        # Get file ID
        file = hashlib.md5(path.encode('utf-8')).hexdigest()

        # Return path
        return self.cache_dir + os.sep + subdir + os.sep + file

    def _file_in_cache(
        self,
        path: str
    ) -> bool:
        """
        Check if a file exists in the local cache
        """

        # Get directory
        directory = os.path.dirname(path)

        # Make sure the cache directory exists
        if not os.path.exists(directory):
            try:
                os.makedirs(directory)
            except OSError as creation_error:
                if creation_error.errno == errno.EEXIST:
                    pass
                else:
                    raise Exception(
                        'Cannot create cache directory') from creation_error

        # Return the file path if it exists
        if os.path.isfile(path) and time.time() - \
                os.path.getmtime(path) <= self.max_age:
            return True

        return False

    @staticmethod
    def _processing_handler(
        datasets: list,
        load: Callable[[dict], None],
        max_threads: int
    ) -> None:

        # Single-thread processing
        if max_threads < 2:

            for dataset in datasets:
                load(*dataset)

        # Multi-thread processing
        else:

            pool = ThreadPool(max_threads)
            pool.starmap(load, datasets)

            # Wait for Pool to finish
            pool.close()
            pool.join()

    def _load_handler(
        self,
        path: str,
        columns: list,
        types: dict,
        parse_dates: list,
        coerce_dates: bool = False
    ) -> pd.DataFrame:

        try:

            # Read CSV file from Meteostat endpoint
            df = pd.read_csv(
                self.endpoint + path,
                compression='gzip',
                names=columns,
                dtype=types,
                parse_dates=parse_dates)

            # Force datetime conversion
            if coerce_dates:
                df.iloc[:, parse_dates] = df.iloc[:, parse_dates].apply(
                    pd.to_datetime, errors='coerce')

        except BaseException:

            # Create empty DataFrane
            df = pd.DataFrame(columns=[*types])

        # Return DataFrame
        return df

    @staticmethod
    def _validate_series(
        df: pd.DataFrame,
        station: str
    ) -> pd.DataFrame:

        # Add missing column(s)
        if 'time' not in df.columns:
            df['time'] = None

        # Add weather station ID
        df['station'] = station

        # Set index
        df = df.set_index(['station', 'time'])

        # Return DataFrame
        return df

    @staticmethod
    def _weighted_average(step: pd.DataFrame):
        """
        Calculate weighted average from grouped data
        """

        data = np.ma.masked_array(step, np.isnan(step))
        data = np.ma.average(data, axis=0, weights=data[:, -1])
        data = data.filled(np.NaN)

        return pd.DataFrame(data=[data], columns=step.columns)

    @classmethod
    def clear_cache(
        cls,
        max_age: int = None
    ) -> None:
        """
        Clear the cache
        """

        try:

            if os.path.exists(cls.cache_dir + os.sep + cls.cache_subdir):

                # Set max_age
                if max_age is None:
                    max_age = cls.max_age

                # Get current time
                now = time.time()

                # Go through all files
                for file in os.listdir(
                        cls.cache_dir + os.sep + cls.cache_subdir):

                    # Get full path
                    path = os.path.join(
                        cls.cache_dir + os.sep + cls.cache_subdir, file)

                    # Check if file is older than max_age
                    if now - \
                            os.path.getmtime(path) > max_age and os.path.isfile(path):
                        # Delete file
                        os.remove(path)

        except BaseException as clear_error:
            raise Exception('Cannot clear cache') from clear_error

    @staticmethod
    def _degree_mean(data: pd.Series):
        """
        Return the mean of a list of degrees
        """

        rads = np.deg2rad(data)
        sums = np.arctan2(np.sum(np.sin(rads)), np.sum(np.cos(rads)))
        return (np.rad2deg(sums) + 360) % 360


In [3]:
import os
from math import cos, sqrt, radians
from copy import copy
from datetime import datetime, timedelta
from typing import Union
import pandas as pd


class Stations(Core):

    """
    Select weather stations from the full list of stations
    """

    # The cache subdirectory
    cache_subdir: str = 'stations'

    # The list of selected weather Stations
    stations = None

    # Raw data columns
    _columns: list = [
        'id',
        'name',
        'country',
        'region',
        'wmo',
        'icao',
        'latitude',
        'longitude',
        'elevation',
        'timezone',
        'hourly_start',
        'hourly_end',
        'daily_start',
        'daily_end'
    ]

    # Processed data columns with types
    _types: dict = {
        'id': 'string',
        'name': 'object',
        'country': 'string',
        'region': 'string',
        'wmo': 'string',
        'icao': 'string',
        'latitude': 'float64',
        'longitude': 'float64',
        'elevation': 'float64',
        'timezone': 'string'
    }

    # Columns for date parsing
    _parse_dates: list = [10, 11, 12, 13]

    def _load(self) -> None:
        """
        Load file from Meteostat
        """

        # File name
        file = 'stations/lib.csv.gz'

        # Get local file path
        path = self._get_file_path(self.cache_subdir, file)

        # Check if file in cache
        if self.max_age > 0 and self._file_in_cache(path):

            # Read cached data
            df = pd.read_pickle(path)

        else:

            # Get data from Meteostat
            df = self._load_handler(
                file,
                self._columns,
                self._types,
                self._parse_dates,
                True)

            # Add index
            df = df.set_index('id')

            # Save as Pickle
            if self.max_age > 0:
                df.to_pickle(path)

        # Set data
        self.stations = df

    def __init__(self) -> None:

        # Get all weather stations
        self._load()

        # Clear cache
        if self.max_age > 0:
            self.clear_cache()

    def id(
        self,
        organization: str,
        code: str
    ) -> 'Stations':
        """
        Get weather station by identifier
        """

        # Create temporal instance
        temp = copy(self)

        if isinstance(code, str):
            code = [code]

        if organization == 'meteostat':
            temp.stations = temp.stations[temp.stations.index.isin(code)]
        else:
            temp.stations = temp.stations[temp.stations[organization].isin(
                code)]

        # Return self
        return temp

    def nearby(
        self,
        lat: float,
        lon: float,
        radius: int = None
    ) -> 'Stations':
        """
        Sort/filter weather stations by physical distance
        """

        # Create temporal instance
        temp = copy(self)

        # Calculate distance between weather station and geo point
        def distance(station, point) -> float:
            # Earth radius in m
            radius = 6371000

            x = (radians(point[1]) - radians(station['longitude'])) * \
                cos(0.5 * (radians(point[0]) + radians(station['latitude'])))
            y = (radians(point[0]) - radians(station['latitude']))

            return radius * sqrt(x * x + y * y)

        # Get distance for each stationsd
        temp.stations['distance'] = temp.stations.apply(
            lambda station: distance(station, [lat, lon]), axis=1)

        # Filter by radius
        if radius is not None:
            temp.stations = temp.stations[temp.stations['distance'] <= radius]

        # Sort stations by distance
        temp.stations.columns.str.strip()
        temp.stations = temp.stations.sort_values('distance')

        # Return self
        return temp

    def region(
        self,
        country: str,
        state: str = None
    ) -> 'Stations':
        """
        Filter weather stations by country/region code
        """

        # Create temporal instance
        temp = copy(self)

        # Country code
        temp.stations = temp.stations[temp.stations['country'] == country]

        # State code
        if state is not None:
            temp.stations = temp.stations[temp.stations['region'] == state]

        # Return self
        return temp

    def bounds(
        self,
        top_left: tuple,
        bottom_right: tuple
    ) -> 'Stations':
        """
        Filter weather stations by geographical bounds
        """

        # Create temporal instance
        temp = copy(self)

        # Return stations in boundaries
        temp.stations = temp.stations[
            (temp.stations['latitude'] <= top_left[0]) &
            (temp.stations['latitude'] >= bottom_right[0]) &
            (temp.stations['longitude'] <= bottom_right[1]) &
            (temp.stations['longitude'] >= top_left[1])
        ]

        # Return self
        return temp

    def inventory(
        self,
        granularity: str,
        required: Union[bool, datetime, tuple]
    ) -> 'Stations':
        """
        Filter weather stations by inventory data
        """

        # Create temporal instance
        temp = copy(self)

        if required is True:
            # Make sure data exists at all
            temp.stations = temp.stations[
                (pd.isna(temp.stations[granularity + '_start']) == False)
            ]
        elif isinstance(required, tuple):
            # Make sure data exists across period
            temp.stations = temp.stations[
                (pd.isna(temp.stations[granularity + '_start']) == False) &
                (temp.stations[granularity + '_start'] <= required[0]) &
                (
                    temp.stations[granularity + '_end'] +
                    timedelta(seconds=temp.max_age)
                    >= required[1]
                )
            ]
        else:
            # Make sure data exists on a certain day
            temp.stations = temp.stations[
                (pd.isna(temp.stations[granularity + '_start']) == False) &
                (temp.stations[granularity + '_start'] <= required) &
                (
                    temp.stations[granularity + '_end'] +
                    timedelta(seconds=temp.max_age)
                    >= required
                )
            ]

        return temp

    def convert(
        self,
        units: dict
    ) -> 'Stations':
        """
        Convert columns to a different unit
        """

        # Create temporal instance
        temp = copy(self)

        # Change data units
        for parameter, unit in units.items():
            if parameter in temp.stations.columns.values:
                temp.stations[parameter] = temp.stations[parameter].apply(
                    unit)

        # Return class instance
        return temp

    def count(self) -> int:
        """
        Return number of weather stations in current selection
        """

        return len(self.stations.index)

    def fetch(
        self,
        limit: int = None,
        sample: bool = False
    ) -> pd.DataFrame:
        """
        Fetch all weather stations or a (sampled) subset
        """

        # Copy DataFrame
        temp = copy(self.stations)

        # Return limited number of sampled entries
        if sample and limit:
            return temp.sample(limit)

        # Return limited number of entries
        if limit:
            return temp.head(limit)

        # Return all entries
        return temp


In [7]:
"""
Point Class

Meteorological data provided by Meteostat (https://dev.meteostat.net)
under the terms of the Creative Commons Attribution-NonCommercial
4.0 International Public License.

The code is licensed under the MIT license.
"""

from datetime import datetime
from meteostat import Stations


class Point:

    """
    Automatically select weather stations by geographic location
    """

    # The interpolation method (weighted or nearest)
    method: str = 'weighted'

    # Maximum radius for nearby stations
    radius: int = 35000

    # Maximum difference in altitude
    alt_range: int = 350

    # Maximum number of stations
    max_count: int = 4

    # Adapt temperature data based on altitude
    adapt_temp: bool = True

    # Distance Weight
    weight_dist: float = 0.6

    # Altitude Weight
    weight_alt: float = 0.4

    # The latitude
    lat: float = None

    # The longitude
    lon: float = None

    # The altitude
    alt: int = None

    def __init__(
        self,
        lat: float,
        lon: float,
        alt: int = None
    ) -> None:

        self.lat = lat
        self.lon = lon
        self.alt = alt

        if alt is None:
            self.adapt_temp = False

    def get_stations(self, granularity: str, start: datetime, end: datetime):
        """
        Get list of nearby weather stations
        """

        # Get nearby weather stations
        stations = Stations()
        stations = stations.nearby(self.lat, self.lon, self.radius)

        # Guess altitude if not set
        if self.alt is None:
            self.alt = stations.fetch().head(self.max_count)[
                'elevation'].mean()

        # Apply inventory filter
        stations = stations.inventory(granularity, (start, end))

        # Apply altitude filter
        stations = stations.fetch()
        stations = stations[abs(self.alt -
                                stations['elevation']) <= self.alt_range]

        # Calculate score values
        stations['score'] = ((1 - (stations['distance'] / self.radius)) * self.weight_dist) + (
            (1 - (abs(self.alt - stations['elevation']) / self.alt_range)) * self.weight_alt)

        # Sort by score (descending)
        stations = stations.sort_values('score', ascending=False)

        return stations.head(self.max_count)


In [8]:
import os
from copy import copy
from datetime import datetime
from typing import Union
from numpy import NaN
import numpy as np
import pandas as pd



class Daily(Core):

    """
    Retrieve daily weather observations for one or multiple weather stations or
    a single geographical point
    """

    # The cache subdirectory
    cache_subdir: str = 'daily'

    # The list of weather Stations
    stations = None

    # The start date
    start: datetime = None

    # The end date
    end: datetime = None

    # Include model data?
    model: bool = True

    # The data frame
    data = pd.DataFrame()

    # Columns
    _columns: list = [
        'date',
        'tavg',
        'tmin',
        'tmax',
        'prcp',
        'snow',
        'wdir',
        'wspd',
        'wpgt',
        'pres',
        'tsun'
    ]

    # Data tapes
    _types: dict = {
        'tavg': 'float64',
        'tmin': 'float64',
        'tmax': 'float64',
        'prcp': 'float64',
        'snow': 'float64',
        'wdir': 'float64',
        'wspd': 'float64',
        'wpgt': 'float64',
        'pres': 'float64',
        'tsun': 'float64'
    }

    # Columns for date parsing
    _parse_dates: dict = {
        'time': [0]
    }

    # Default aggregation functions
    _aggregations: dict = {
        'tavg': 'mean',
        'tmin': 'min',
        'tmax': 'max',
        'prcp': 'sum',
        'snow': 'mean',
        'wdir': Core._degree_mean,
        'wspd': 'mean',
        'wpgt': 'max',
        'pres': 'mean',
        'tsun': 'sum'
    }

    def _load(
        self,
        station: str
    ) -> None:
        """
        Load file from Meteostat
        """

        # File name
        file = 'daily/' + ('full' if self.model else 'obs') + \
            '/' + station + '.csv.gz'

        # Get local file path
        path = self._get_file_path(self.cache_subdir, file)

        # Check if file in cache
        if self.max_age > 0 and self._file_in_cache(path):

            # Read cached data
            df = pd.read_pickle(path)

        else:

            # Get data from Meteostat
            df = self._load_handler(
                file,
                self._columns,
                self._types,
                self._parse_dates)

            # Validate Series
            df = self._validate_series(df, station)

            # Save as Pickle
            if self.max_age > 0:
                df.to_pickle(path)

        # Filter time period and append to DataFrame
        if self.start and self.end:

            # Get time index
            time = df.index.get_level_values('time')

            # Filter & append
            self.data = self.data.append(
                df.loc[(time >= self.start) & (time <= self.end)])

        else:

            # Append
            self.data = self.data.append(df)

    def _get_data(self) -> None:
        """
        Get all required data
        """

        if len(self.stations) > 0:

            # List of datasets
            datasets = []

            for station in self.stations:
                datasets.append((
                    str(station),
                ))

            # Data Processing
            self._processing_handler(datasets, self._load, self.max_threads)

        else:

            # Empty DataFrame
            self.data = pd.DataFrame(columns=[*self._types])

    def _resolve_point(
        self,
        method: str,
        stations: pd.DataFrame,
        alt: int,
        adapt_temp: bool
    ) -> None:
        """
        Project weather station data onto a single point
        """

        if self.stations.size == 0:
            return None

        if method == 'nearest':

            self.data = self.data.groupby(
                pd.Grouper(level='time', freq='1D')).agg('first')

        else:

            # Join score and elevation of involved weather stations
            data = self.data.join(
                stations[['score', 'elevation']], on='station')

            # Adapt temperature-like data based on altitude
            if adapt_temp:
                data.loc[data['tavg'] != np.NaN, 'tavg'] = data['tavg'] + \
                    ((2 / 3) * ((data['elevation'] - alt) / 100))
                data.loc[data['tmin'] != np.NaN, 'tmin'] = data['tmin'] + \
                    ((2 / 3) * ((data['elevation'] - alt) / 100))
                data.loc[data['tmax'] != np.NaN, 'tmax'] = data['tmax'] + \
                    ((2 / 3) * ((data['elevation'] - alt) / 100))

            # Exclude non-mean data & perform aggregation
            excluded = data['wdir']
            excluded = excluded.groupby(
                pd.Grouper(level='time', freq='1D')).agg('first')

            # Aggregate mean data
            data = data.groupby(
                pd.Grouper(level='time', freq='1D')).apply(self._weighted_average)

            # Drop RangeIndex
            data.index = data.index.droplevel(1)

            # Merge excluded fields
            data['wdir'] = excluded

            # Drop score and elevation
            self.data = data.drop(['score', 'elevation'], axis=1).round(1)

        # Set placeholder station ID
        self.data['station'] = 'XXXXX'
        self.data = self.data.set_index(
            ['station', self.data.index.get_level_values('time')])
        self.stations = pd.Index(['XXXXX'])

    def __init__(
        self,
        loc: Union[pd.DataFrame, Point, list, str],
        start: datetime = None,
        end: datetime = None,
        model: bool = True
    ) -> None:

        # Set list of weather stations
        if isinstance(loc, pd.DataFrame):
            self.stations = loc.index
        elif isinstance(loc, Point):
            stations = loc.get_stations('hourly', start, end)
            self.stations = stations.index
        else:
            if not isinstance(loc, list):
                loc = [loc]

            self.stations = pd.Index(loc)

        # Set start date
        self.start = start

        # Set end date
        self.end = end

        # Set model
        self.model = model

        # Get data for all weather stations
        self._get_data()

        # Interpolate data
        if isinstance(loc, Point):
            self._resolve_point(loc.method, stations, loc.alt, loc.adapt_temp)

        # Clear cache
        if self.max_age > 0:
            self.clear_cache()

    def normalize(self) -> 'Daily':
        """
        Normalize the DataFrame
        """

        # Create temporal instance
        temp = copy(self)

        # Create result DataFrame
        result = pd.DataFrame(columns=temp._columns[1:])

        # Go through list of weather stations
        for station in temp.stations:
            # Create data frame
            df = pd.DataFrame(columns=temp._columns[1:])
            # Add time series
            df['time'] = pd.date_range(temp.start, temp.end, freq='1D')
            # Add station ID
            df['station'] = station
            # Add columns
            for column in temp._columns[1:]:
                # Add column to DataFrame
                df[column] = NaN

            result = pd.concat([result, df], axis=0)

        # Set index
        result = result.set_index(['station', 'time'])

        # Merge data
        temp.data = pd.concat([temp.data, result], axis=0).groupby(
            ['station', 'time'], as_index=True).first()

        # None -> NaN
        temp.data = temp.data.fillna(NaN)

        # Return class instance
        return temp

    def interpolate(
        self,
        limit: int = 3
    ) -> 'Daily':
        """
        Interpolate NULL values
        """

        # Create temporal instance
        temp = copy(self)

        # Apply interpolation
        temp.data = temp.data.groupby('station').apply(
            lambda group: group.interpolate(
                method='linear', limit=limit, limit_direction='both', axis=0))

        # Return class instance
        return temp

    def aggregate(
        self,
        freq: str = '1D',
        spatial: bool = False
    ) -> 'Daily':
        """
        Aggregate observations
        """

        # Create temporal instance
        temp = copy(self)

        # Time aggregation
        temp.data = temp.data.groupby(['station', pd.Grouper(
            level='time', freq=freq)]).agg(temp._aggregations)

        # Spatial aggregation
        if spatial:
            temp.data = temp.data.groupby(
                [pd.Grouper(level='time', freq=freq)]).mean()

        # Round
        temp.data = temp.data.round(1)

        # Return class instance
        return temp

    def convert(
        self,
        units: dict
    ) -> 'Daily':
        """
        Convert columns to a different unit
        """

        # Create temporal instance
        temp = copy(self)

        # Change data units
        for parameter, unit in units.items():
            if parameter in temp._columns:
                temp.data[parameter] = temp.data[parameter].apply(unit)

        # Return class instance
        return temp

    def coverage(
        self,
        parameter: str = None
    ) -> float:
        """
        Calculate data coverage (overall or by parameter)
        """

        expect = (self.end - self.start).days + 1

        if parameter is None:
            return len(self.data.index) / expect

        return self.data[parameter].count() / expect

    def count(self) -> int:
        """
        Return number of rows in DataFrame
        """

        return len(self.data.index)

    def fetch(self) -> pd.DataFrame:
        """
        Fetch DataFrame
        """

        # Copy DataFrame
        temp = copy(self.data)

        # Remove station index if it's a single station
        if len(self.stations) == 1 and 'station' in temp.index.names:
            temp = temp.reset_index(level='station', drop=True)

        # Return data frame
        return temp


In [9]:
from datetime import datetime
import matplotlib.pyplot as plt
import pandas as pd

In [11]:
# importing data from S3
project_dir = ("https://s3groupaustralia.s3.eu-central-1.amazonaws.com/data/temperature/")

df=pd.read_csv(project_dir + 'newweather.csv')
df=df.values.tolist()

In [12]:
all=pd.DataFrame()
for row in df:
    stations=Stations()
    lat=row[1]
    lon=row[2]
    stations=stations.nearby(lat,lon)
    station=stations.fetch(6)
    start=datetime(2000,1,1)
    end=datetime(2020,12,31)
    data = Daily(station, start, end)
    data = data.aggregate('M')
    data=data.fetch()
    data['datetime'] = data.index.get_level_values('time')
    data=data.reset_index(drop=True,inplace=False)
    a=data.groupby('datetime', as_index=False)['tavg'].mean()
    b=data.groupby('datetime', as_index=False)['tmin'].min()
    c=data.groupby('datetime', as_index=False)['tmax'].max()
    d=data.groupby('datetime', as_index=False)['prcp'].mean()
    e=data.groupby('datetime', as_index=False)['wspd'].mean()
    final=pd.merge(a,b)
    final=pd.merge(final,c)
    final=pd.merge(final,d)
    final=pd.merge(final,e)
    country=row[0]
    final.insert(0,'country',country)   
    year=row[3]
    month=row[4]
    heatwave=[]
    finallist=final.values.tolist()
    heatwave=[]
    for j in finallist:
        if j[1].year ==  year and j[1].month == month:
            heatwave.append('1')
        else:
            heatwave.append('0')
    heatwave=pd.DataFrame(heatwave)
    final=pd.concat([final, heatwave], axis=1)
    all=all.append(final)
all.to_excel('newfinalweather.xlsx')
        