In [1]:
#this notebook is designed to access the unavco gps/gnss ftp system, and from the
#file list generated by their DAIv2 interface it will download all of those files,
#since their download manager would not function properly on any of my browsers.

#In addition to this, this notebook separates the measurements in the file by measurement type
#and maintains the RINEX text format
#there is also a diff test of origional files with the specified index to my created files
#set the flag speceifications

#This notebook is threaded to reduce io latency
#at the moment, must be placed in directory above 'Downloads'

In [2]:
import os
import pandas as pd
import numpy as np
from ftplib import FTP
import shutil
import time
import csv
import threading
import math

In [3]:
#selection/specification by user
dirpath="Downloads/testdir3"#setup the directory you would like files to be put, preferrably 1 level away from 
                            #where you want the zip to end up
thdcnt=15                   #how many threads to create and use

#required files
paths=pd.read_fwf('p007_16_fileList.jsp.txt',header=None,skiprows=2)#get paths from filelist, from DAIv2


#constants
my_col_lab=['year','month','day','hour','minute','second','pressure','temp dry','relative humidity','wind speed','wind direction','rain inc','hail inc']
run_diff_test=True
headerlist= {'col1': ['PR','TD','HR','WS','WD','RI','HI']}
headerdf=pd.DataFrame(data=headerlist)

ftplist=[]
for x in range(0,thdcnt):
    ftplist.append(FTP('data-out.unavco.org'))

fnames=pd.DataFrame()
fnames=paths[0].str.slice(49)
savepath=fnames[0][:-10]#should be same for all files from a single station
paths=paths[0].str.slice(26,49)
headercoldict={'PR':'pressure','TD':'temp dry','HR':'relative humidity','WS':'wind speed','WD':'wind direction','RI':'rain inc','HI':'hail inc'}
headerextdict={'PR':'_PR','TD':'_TD','HR':'_HR','WS':'_WS','WD':'_WD','RI':'_RI','HI':'_HI'}
idxdict={'PR':6,'TD':7,'HR':8,'WS':9,'WD':10,'RI':11,'HI':12}
header_rep='    PR    TD    HR    WS    WD    RI    HI'
nancnt={'PR':0,'TD':0,'HR':0,'WS':0,'WD':0,'RI':0,'HI':0}
zerocnt={'PR':0,'TD':0,'HR':0,'WS':0,'WD':0,'RI':0,'HI':0}
tot={'PR':0,'TD':0,'HR':0,'WS':0,'WD':0,'RI':0,'HI':0}
lock=threading.Lock()

myl1=lambda x:  ' '*(2-len(str(x)))+str(x)
myl2=lambda x:  ' '*(6-len(str(x)))+str(x)
myl3=lambda x:  ' '*(3-len(str(x)))+str(x)
def resetnanzero():
    for headerrem in zerocnt:
        nancnt[headerrem]=0
        zerocnt[headerrem]=0

In [4]:
#set up directories
if not os.path.exists(dirpath):
    os.makedirs(dirpath)

for state in headerextdict:
    if not os.path.exists('%s/%s%s'%(dirpath,savepath,headerextdict[state])):
        os.makedirs('%s/%s%s'%(dirpath,savepath,headerextdict[state]))

In [5]:
for ftptmp in ftplist:#would like to reduce amount of these in the future
    ftptmp.login()#unavco's ftp is public, no need for username/pass

In [6]:
print(savepath)

p007


In [7]:
def filemanip(filename,myftp):
    with open(filename, "wb") as gFile:#open file, write binary to my copy
        myftp.retrbinary('%s%s'%('RETR ',filename), gFile.write)
    os.system('%s%s'%("gunzip ",filename))#unzip using system gunzip!!,0 means success
    tmpname=filename[:-2]#remove unix compress extension
    foundend=False
    
    for headerrem in headerextdict:
        with open(tmpname, "r") as p:#this is to grab the header for later
            s=''
            count=0
            flg=False;
            for line in p.readlines():
                #print(line)
                if '# / TYPES OF OBSERV' in line:#this may be where I process the different headers
                    line=line.replace('7','1',1)
                    line=line.replace(header_rep,'%s%s%s'%('    ',headerrem,'                                    '),1)
                    flg=True
        
                if not flg:
                    s+=line;
                elif flg and headerrem in line:
                    s+=line
                count+=1;
                if ' END OF HEADER' in line:
                    s+=line
                    break
            #print("count="+str(count))#and how many lines the header is
        if not foundend:
            df=pd.read_fwf(tmpname,header=None,sep=" ",skiprows=count,widths=[3,3,3,3,3,3,7,7,7,7,7,7,7])#spacing is fixed so this should work for all files discounting the string header
            df.columns=my_col_lab
            df[['month','day','hour','minute','second']]=df[['month','day','hour','minute','second']].applymap(myl1)#adjust select column spacing here
            df[['pressure','temp dry','relative humidity','wind speed','wind direction','rain inc','hail inc']]=df[['pressure','temp dry','relative humidity','wind speed','wind direction','rain inc','hail inc']].applymap(myl2)
            df['year']=df['year'].apply(myl3)
            foundend=True
        savename='%s%s/%s'%(savepath,headerextdict[headerrem],tmpname)
        #above rename columns for convienence create file name
    
        dff=df[['year','month','day','hour','minute','second',headercoldict[headerrem]]].to_csv(header=False,index=False,sep=' ').replace('"', '')
        
        with open(savename, "w") as f:
            f.write('%s%s'%(s,dff))
    return

In [8]:
os.chdir(dirpath)

In [9]:
def thdfunc(start,name,end,myftp):
    #print('thread %s start %s end %s'%(name,start,end))
    start_time = time.time()
    for x in range(start,end):#manipulate every file from every path
        myftp.cwd(paths[x])   #how I navigate
        filemanip(fnames[x],myftp)
        myftp.cwd("/")        #reset navigation
    end_time= time.time()
    with lock:
        print('------------thd:thread%s  finished------------'%(name,))
        print("tot time: %s seconds" % (end_time - start_time))
        print('approx. time per source file %s seconds, per my file %s seconds' % (str(int(end_time - start_time)/(end-start)),str(int(end_time - start_time)/((end-start)*7))))
    #print('approx. time per my file %s seconds' % str(int(end_time - start_time)/(paths.size*7)))

In [10]:
def compute_end(count,num,i):
    if i==thdcnt-1:
        return paths.size
    elif i==0:
        return num
    else:
        return count+num

In [11]:
inter=math.floor(paths.size/thdcnt)
#print('inter=%s'%(inter,))
count=0
threads=[]
tot_st_time=time.time()
for i in range(0,thdcnt):
    mythread=threading.Thread(target = thdfunc, args = (count,i,compute_end(count,inter,i),ftplist[i], ))
    threads.append(mythread)
    mythread.start()
    count+=inter
print('---all threads started---')
for thd in threads:
    thd.join()
tot_end_time=time.time()
for ftprid in ftplist:
    ftprid.quit()

print ("threads finished...exiting tot time=%s"%(tot_end_time-tot_st_time))

---all threads started---
------------thd:thread8  finished------------
tot time: 25.505703926086426 seconds
approx. time per source file 1.0416666666666667 seconds, per my file 0.1488095238095238 seconds
------------thd:thread1  finished------------
tot time: 25.828577041625977 seconds
approx. time per source file 1.0416666666666667 seconds, per my file 0.1488095238095238 seconds
------------thd:thread13  finished------------
tot time: 27.114087104797363 seconds
approx. time per source file 1.125 seconds, per my file 0.16071428571428573 seconds
------------thd:thread12  finished------------
tot time: 28.242856979370117 seconds
approx. time per source file 1.1666666666666667 seconds, per my file 0.16666666666666666 seconds
------------thd:thread5  finished------------
tot time: 28.74414610862732 seconds
approx. time per source file 1.1666666666666667 seconds, per my file 0.16666666666666666 seconds
------------thd:thread2  finished------------
tot time: 28.948076963424683 seconds
appro

In [12]:
#when this hits 1 you are done for that stations measurements for that year

In [13]:
os.getcwd()#should contain our created directories

'/Users/nickvancise/Downloads/testdir3'

In [14]:
for headerrem in headerextdict:
    shutil.make_archive(savepath+headerextdict[headerrem], 'zip', savepath+headerextdict[headerrem])#zip for download/upload
print('---zips created---')

---zips created---


In [15]:
def diff_test():
    count=0
    not_ident=False
    my_idx=[0,1,2,3,4,5,6]
    for name in fnames:
        tmpname=name[:-2]
        dftest1=pd.read_fwf(tmpname,skiprows=15,sep='+s/',header=None,widths=[3,3,3,3,3,3,7,7,7,7,7,7,7])#orig file
        for headerrem in idxdict:
            count=count+1
            dftest2=pd.read_fwf(savepath+headerextdict[headerrem]+'/'+tmpname,skiprows= 9 if headerrem=='PR' else 8,sep='+s/',header=None,widths=[3,3,3,3,3,3,8])#my file
            print('%s%s'%(str(count),'           '),end='\n' if (count>=fnames.size*7) else '\r')
            orig_idx=[0,1,2,3,4,5,idxdict[headerrem]]
            dftemp=dftest1[orig_idx]
            dftemp.columns=[0,1,2,3,4,5,6]
            nancnt[headerrem]+=(dftemp.loc[:,6].astype(float)==-9999.9).sum()
            zerocnt[headerrem]+=(dftemp.loc[:,6].astype(float)==0.0).sum()
            tot[headerrem]+=len(dftemp.index)
            #print(dftemp)
            #print(len(dftemp.index))
            #print(dftest2)
            if not dftemp.equals(dftest2):
                print ("The lists are not identical "+headerextdict[headerrem]+": "+name[:-2]+" "+tmpname)#col is  zero indexed
                not_ident=True
                newdf=pd.concat([dftemp,dftest2]).drop_duplicates(keep=False)
                print('problem rows')
                print(newdf)
                #sys.exit('pausing')
    if not_ident:
        return False
    return True

In [16]:
#will print the number of files it has checked, and all the files with differences
#to their corresponding pair are listed, and check flag is set

In [18]:
#import sys
if run_diff_test:
    resetnanzero()
    diff_start_time = time.time()
    b=diff_test()
    diff_end_time=time.time()
    if b:
        print('---all files are identical---')
    
    
    for headerrem in nancnt:
        print('nancnt %s=%s tot=%s percnan=%s%%'%(headerrem,str(nancnt[headerrem]),str(tot[headerrem]),str((float(nancnt[headerrem])/tot[headerrem])*100)))
    print('\n')
    for headerrem in zerocnt:
        print('zerocnt %s=%s tot=%s perczero=%s%%'%(headerrem,str(zerocnt[headerrem]),str(tot[headerrem]),str((float(zerocnt[headerrem])/tot[headerrem])*100)))
    
    
    print("diff time: %s seconds" % (diff_end_time - diff_start_time))

2562           
---all files are identical---
nancnt PR=0 tot=210816 percnan=0.0%
nancnt TD=0 tot=210816 percnan=0.0%
nancnt HR=22 tot=210816 percnan=0.01043564055859138%
nancnt WS=98813 tot=210816 percnan=46.87167956891318%
nancnt WD=98813 tot=210816 percnan=46.87167956891318%
nancnt RI=0 tot=210816 percnan=0.0%
nancnt HI=0 tot=210816 percnan=0.0%


zerocnt PR=0 tot=210816 perczero=0.0%
zerocnt TD=775 tot=210816 perczero=0.3676191560412872%
zerocnt HR=0 tot=210816 perczero=0.0%
zerocnt WS=1 tot=210816 perczero=0.00047434729811778997%
zerocnt WD=4 tot=210816 perczero=0.0018973891924711599%
zerocnt RI=102768 tot=210816 perczero=48.747723132969035%
zerocnt HI=105403 tot=210816 perczero=49.99762826350941%
diff time: 16.256608963012695 seconds
