In [2]:
from ftplib import FTP
from pathlib import Path

import pandas as pd
import geopandas as gpd
from tqdm.notebook import trange, tqdm

import dask.bag as db
import dask.dataframe as dd
import dask.diagnostics as dg

In [3]:
GHCNP = Path('ghcn_files')
GHCNP.mkdir(parents=True, exist_ok=True)
station_file = GHCNP.joinpath('ghcnd-stations.txt')

In [4]:
with FTP("ftp.ncdc.noaa.gov") as ftp:
    ftp.login()
    
    ftp.cwd('pub/data/ghcn/daily/')
    
    with open(station_file, 'wb') as stations:
        ftp.retrbinary(f'RETR {station_file.name}', stations.write) 
    ftp.cwd("by_year/")
    files = ftp.nlst()
    print(f'{len(files)} files')
    
    for filename in tqdm(files):
        fpath = Path(GHCNP.joinpath(filename))
        if not fpath.exists():
            with open(fpath, 'wb') as myfile:
                ftp.retrbinary('RETR '+ filename, myfile.write)
                

261 files


HBox(children=(FloatProgress(value=0.0, max=261.0), HTML(value='')))




In [5]:
columns = {"ID": (0,11),
           "LATITUDE": (12, 20), 
           "LONGITUDE": (21, 30),
           "ELEVATION": (31, 37),
           "STATE": (38, 40),
           "NAME": (41, 71),
           "GSN FLAG": (72, 75),
           "HCN/CRN FLAG": (76, 79),
           "WMO ID": (80, 85)}

In [6]:
with open(station_file) as f:
    lines = f.readlines()

In [7]:
def parse_row(line):
    return dict([(col, line[start:stop]) for col, (start, stop) in columns.items()])
with dg.ProgressBar():
    rows = db.from_sequence(lines).map(parse_row).compute()

[########################################] | 100% Completed |  2.3s


In [8]:
rows[0]

{'ID': 'ACW00011604',
 'LATITUDE': ' 17.1167',
 'LONGITUDE': ' -61.7833',
 'ELEVATION': '  10.1',
 'STATE': '  ',
 'NAME': 'ST JOHNS COOLIDGE FLD         ',
 'GSN FLAG': '   ',
 'HCN/CRN FLAG': '   ',
 'WMO ID': '     '}

In [9]:
dfst = pd.DataFrame(rows)
dfst.head()

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN FLAG,HCN/CRN FLAG,WMO ID
0,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,
1,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,
2,AE000041196,25.333,55.517,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0
3,AEM00041194,25.255,55.364,10.4,,DUBAI INTL,,,41194.0
4,AEM00041217,24.433,54.651,26.8,,ABU DHABI INTL,,,41217.0


* ID = 11 character station identification code
* YEAR/MONTH/DAY = 8 character date in YYYYMMDD format (e.g. 19860529 = May 29, 1986)
* ELEMENT = 4 character indicator of element type 
* DATA VALUE = 5 character data value for ELEMENT 
* M-FLAG = 1 character Measurement Flag 
* Q-FLAG = 1 character Quality Flag 
* S-FLAG = 1 character Source Flag 
* OBS-TIME = 4-character time of observation in hour-minute format (i.e. 0700 =7:00 am)


In [10]:
columns = ['ID', 'DATE', 'ELEMENT', 'DATA_VALUE', 'M-FLAG', 'Q-FLAG', 'S-FLAG', 'OBS-TIME']

In [11]:
df = dd.read_csv(GHCNP.joinpath("*.csv.gz"), compression='gzip', names=columns, 
                    dtype={'M-FLAG': 'object'})

Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  "Setting ``blocksize=None``" % compression


In [12]:
df.head()

Unnamed: 0,ID,DATE,ELEMENT,DATA_VALUE,M-FLAG,Q-FLAG,S-FLAG,OBS-TIME
0,ITE00100554,17630101,TMAX,-36,,,E,
1,ITE00100554,17630101,TMIN,-50,,,E,
2,ITE00100554,17630102,TMAX,-26,,,E,
3,ITE00100554,17630102,TMIN,-40,,,E,
4,ITE00100554,17630103,TMAX,-9,,,E,


In [13]:
dfa = df.merge(dfst, how='inner', on='ID')

In [14]:
pd.to_datetime(df.head()['DATE'], format='%Y%m%d')

0   1763-01-01
1   1763-01-01
2   1763-01-02
3   1763-01-02
4   1763-01-03
Name: DATE, dtype: datetime64[ns]

In [None]:
with dg.ProgressBar():
    dates = df['DATE'].compute()

[##########################              ] | 65% Completed |  4min 10.6s

In [21]:
def parse_dates(df):
    return pd.to_datetime(df['DATE'], format = '%Y%m%d')

dfa.map_partitions(parse_dates)

ValueError: Metadata inference failed in `parse_dates`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
ValueError("time data '1' does not match format '%Y%m%d' (match)")

Traceback:
---------
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/dataframe/utils.py", line 169, in raise_on_meta_error
    yield
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/dataframe/core.py", line 4827, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "<ipython-input-21-36a547e3816a>", line 2, in parse_dates
    return pd.to_datetime(df['DATE'], format = '%Y%m%d')
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/pandas/util/_decorators.py", line 208, in wrapper
    return func(*args, **kwargs)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/pandas/core/tools/datetimes.py", line 778, in to_datetime
    values = convert_listlike(arg._values, True, format)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/pandas/core/tools/datetimes.py", line 451, in _convert_listlike_datetimes
    raise e
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/pandas/core/tools/datetimes.py", line 416, in _convert_listlike_datetimes
    arg, format, exact=exact, errors=errors
  File "pandas/_libs/tslibs/strptime.pyx", line 142, in pandas._libs.tslibs.strptime.array_strptime


In [65]:
dfg = gpd.GeoDataFrame(gdf = geopandas.GeoDataFrame(
    df, geometry=geopandas.points_from_xy(df.Longitude, df.Latitude)))

Unnamed: 0_level_0,ID,DATE,ELEMENT,DATA_VALUE,M-FLAG,Q-FLAG,S-FLAG,OBS-TIME,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN FLAG,HCN/CRN FLAG,WMO ID
npartitions=258,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
,object,int64,object,int64,float64,object,object,float64,object,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
