import os
from datetime import datetime
data = sc.textFile("/user/romain/bikes/201408_weather_data.csv")
print 'Number of records: ' + str(data.count())
print '\nFirst ten records:'
for row in data.take(10):
print row
headers = data.first()
print 'Raw headers:\n%s' % headers
headers_names = [header.replace(' ', '') for header in headers.split(',')]
print '\nCleaned headers:\n%s' % headers_names
import re
def guess_type(cell):
if re.match('\d+\.\d+', cell):
return '_f'
elif re.match('\d/\d/\d\d\d\d', cell):
return '_tdt'
elif re.match('\d+', cell):
return '_i'
return '_s'
# We guess the type from the first row of data
headers_types = [guess_type(cell) for cell in data.take(2)[1].split(',')]
headers = [''.join(h) for h in zip(headers_names, headers_types)]
headers = ','.join(headers)
print 'Unified headers:\n%s' % headers
def clean_row(row):
cols = row.split(',')
date = datetime.strptime(cols[0], '%m/%d/%Y')
cols[0] = date.strftime('%Y-%m-%dT%H:%M:%SZ')
if cols[-5] == 'T':
cols[-5] = '0'
return ','.join(cols)
# Skip the header row and apply the cleaning function
records = data.filter(lambda row: not row.startswith('PDT')).map(clean_row)
print 'First ten records with data normalized:'
for row in records.take(10):
print row
csv = headers + '\n' + records.reduce(lambda a, b: a + '\n' + b)
print 'Beginning of csv data:\n%s' % csv[:1000]
import urllib2
req = urllib2.Request('http://localhost:8983/solr/baybikes/update?commitWithin=5000')
req.add_header('Content-Type', 'text/csv')
response = urllib2.urlopen(req, csv)
print 'Done!'
