## Load from CVS
The code in this notebook copies the cvs files for beaked whales from S3 to HDFS and then loads the data into 
a spark DataFrame.


In [None]:
from pyspark import SparkContext
sc = SparkContext(master=master_url)

In [None]:
from pyspark.sql import Row, SQLContext,DataFrame
from pyspark.sql.types import *

sqlContext = SQLContext(sc)

%pylab inline

import pandas as pd
import datetime as dt

from scipy.io import loadmat,savemat,whosmat

from string import split
from collections import Counter
import re
import numpy as np
from numpy import shape

from glob import glob

### Format of cvs files
|field name     | Description               | Data Type
|---------------|---------------------------|--------------
|0: time        | time of click             | string in datetime format `%Y-%m-%d %H:%M:%S.%f`
|1: species		| Initial species classification	        | 'str'
|2: site		| name of site		        | 'str'
|3: rec_no		| recording number		    | 'str'
|4: bout_i		| bout number		        | numpy.int64
|5: peak2peak	| peak to peak magnitude    | 			numpy.float64
|6: MSN	        |	wave form |		 an array of length 202
|208: MSP		|	spectra |	 an array of length 101  
|309: TPWS1		| 1 if click appears in TPWS1	| 	bool
|310: MD1		|	--- " ---	in MD1|	bool
|311: FD1	    |	--- " ---	in FD1|	bool
|312: TPWS2		| 1 if click appears in TPWS2	| 	bool
|313: MD2		|	--- " ---	in MD2|	bool
|314: FD2	    |	--- " ---	in FD2|	bool
|315: TPWS3		| 1 if click appears in TPWS3	| 	bool
|316: MD3		|	--- " ---	in MD3|	bool
|317: FD3	    |	--- " ---	in FD3|	bool
total number of fields= 318


In [5]:
Fields_string="""(time       , datetime),
(species	   , str),
(site	   , str),
(rec_no	   , str),
(bout_i	   , int),
(peak2peak  , float),
(MSN	   , array,202),
(MSP	   , array,101  ),
(TPWS1	   , bool),
(MD1	   , bool),
(FD1	   , bool),
(TPWS2	   , bool),
(MD2	   , bool),
(FD2	   , bool),
(TPWS3	   , bool),
(MD3	   , bool),
(FD3	   , bool)"""
import re
pattern=re.compile(r'\(([\.\w]*)\s*,\s*([\,\.\w]*)\s*\)')

for line in Fields_string.split('\n'):
    #print '\n',line
    match=pattern.search(line)
    if match:
        print "('%s', '%s'),"%(match.group(1),match.group(2))
    else:
        print 'no match'

('time', 'datetime'),
('species', 'str'),
('site', 'str'),
('rec_no', 'str'),
('bout_i', 'int'),
('peak2peak', 'float'),
('MSN', 'array,202'),
('MSP', 'array,101'),
('TPWS1', 'bool'),
('MD1', 'bool'),
('FD1', 'bool'),
('TPWS2', 'bool'),
('MD2', 'bool'),
('FD2', 'bool'),
('TPWS3', 'bool'),
('MD3', 'bool'),
('FD3', 'bool'),


In [6]:
Fields=[('time', 'datetime'),
('species', 'str'),
('site', 'str'),
('rec_no', 'str'),
('bout_i', 'int'),
('peak2peak', 'float'),
('MSN', 'array',202),
('MSP', 'array',101),
('TPWS1', 'bool'),
('MD1', 'bool'),
('FD1', 'bool'),
('TPWS2', 'bool'),
('MD2', 'bool'),
('FD2', 'bool'),
('TPWS3', 'bool'),
('MD3', 'bool'),
('FD3', 'bool')]


### Parsing code

In [8]:
date_format='%Y-%m-%d %H:%M:%S.%f'
def parse_date(s):
    #print 'date string="%s"'%s
    return dt.datetime.strptime(s,date_format)
def parse_array(a):
    np_array=np.array([np.float64(x) for x in a])
    return packArray(np_array)
def parse_int(s):
    return int(s)
def parse_float(s):
    return float(s)
def parse_string(s):
    return(s)

def packArray(a):
    if type(a)!=np.ndarray:
        raise Exception("input to packArray should be numpy.ndarray. It is instead "+str(type(a)))
    return bytearray(a.tobytes())
def unpackArray(x,data_type=np.int16):
    return np.frombuffer(x,dtype=data_type)


date_format='%Y-%m-%d %H:%M:%S.%f'
#prepare date structure for parsing
Parse_rules=[]
index=0
for field in Fields:
    _type=field[1]
    #print _type
    _len=1 # default length in terms of csv fields
    if _type =='array': 
        parser=parse_array
        _len=int(field[2])
    elif _type=='datetime': 
        parser=parse_date
    elif _type=='int': 
        parser=parse_int
    elif _type=='float': 
        parser=parse_float
    elif _type=='bool': 
        parser=parse_int
    elif _type=='str': 
        parser=parse_string
    else:
        print 'unrecognized type',_type
    rule={'name':field[0],
          'start':index,
          'end':index+_len,
          'parser':parser}
    print field,rule
    Parse_rules.append(rule)
    index+=_len


def parse(row):
    items=row.split(',')
    D=[]
    for pr in Parse_rules:
        start=pr['start']
        end=pr['end']
        parser=pr['parser']
        if end-start==1:
            D.append(parser(items[start]))
        else:
            D.append(parser(items[start:end]))
    return RowObject(*D)

field_names=[a['name'] for a in Parse_rules]
print field_names
RowObject= Row(*field_names)
RowObject

### Code for 

In [219]:
%cd /root/ipython/BeakedWhaleClassification/
%run Credentials.ipynb

/root/ipython/BeakedWhaleClassification


In [220]:
s3helper.set_credential(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

In [7]:
s3helper.open_bucket('while-classification')
s3helper.ls_s3()

[u'CVS',
 u'DT_Cuviers',
 u'DT_Gervais',
 u'GC_Cuviers',
 u'GC_Gervais',
 u'MC_Cuviers',
 u'MC_Gervais']

In [8]:
dirs=s3helper.ls_s3('CVS')
dirs[:10]

[u'CVS/DT.01.Cuviers',
 u'CVS/DT.01.Gervais',
 u'CVS/DT.02.Cuviers',
 u'CVS/DT.02.Gervais',
 u'CVS/DT.03.Cuviers',
 u'CVS/DT.03.Gervais',
 u'CVS/DT.04.Cuviers',
 u'CVS/DT.04.Gervais',
 u'CVS/DT.05.Cuviers',
 u'CVS/DT.05.Gervais']

In [7]:
from time import time

### Copy from S3 to HDFS

In [14]:
t1=time()
s3helper.s3_to_hdfs('CVS', 'CVS')
time()-t1

cp: `/CVS/DT.01.Cuviers.1.cvs': File exists
cp: `/CVS/DT.01.Cuviers.10.cvs': File exists
cp: `/CVS/DT.01.Cuviers.11.cvs': File exists
cp: `/CVS/DT.01.Cuviers.12.cvs': File exists
cp: `/CVS/DT.01.Cuviers.13.cvs': File exists
cp: `/CVS/DT.01.Cuviers.14.cvs': File exists
cp: `/CVS/DT.01.Cuviers.15.cvs': File exists
cp: `/CVS/DT.01.Cuviers.16.cvs': File exists
cp: `/CVS/DT.01.Cuviers.17.cvs': File exists
cp: `/CVS/DT.01.Cuviers.18.cvs': File exists
cp: `/CVS/DT.01.Cuviers.19.cvs': File exists
cp: `/CVS/DT.01.Cuviers.2.cvs': File exists
cp: `/CVS/DT.01.Cuviers.20.cvs': File exists
cp: `/CVS/DT.01.Cuviers.21.cvs': File exists
cp: `/CVS/DT.01.Cuviers.22.cvs': File exists
cp: `/CVS/DT.01.Cuviers.23.cvs': File exists
cp: `/CVS/DT.01.Cuviers.3.cvs': File exists
cp: `/CVS/DT.01.Cuviers.4.cvs': File exists
cp: `/CVS/DT.01.Cuviers.5.cvs': File exists
cp: `/CVS/DT.01.Cuviers.6.cvs': File exists
cp: `/CVS/DT.01.Cuviers.7.cvs': File exists
cp: `/CVS/DT.01.Cuviers.8.cvs': File exists
cp: `/CVS/DT.01.Cu

341.0969409942627

### Parsing code

### Read data into dataframe

In [None]:
from pyspark.sql import DataFrame
CVS_Data=sc.textFile("/CVS/")
RDD=CVS_Data.map(parse)

df=sqlContext.createDataFrame(RDD)

In [82]:
t0=time()
df.cache().count()
time()-t0

305.18370389938354

In [84]:
t0=time()
print df.count()
time()-t0

6353182


0.7476310729980469