In [4]:
import urllib
import os
import datetime
import zipfile
import csv

def is_number(s):
    try:
        float(s)
        return True
    except ValueError:
        pass
 
    try:
        import unicodedata
        unicodedata.numeric(s)
        return True
    except (TypeError, ValueError):
        pass
 
    return False

def vancouver_filename(year):
    now = datetime.datetime.now()
    filename = "business_licences_csv.zip"
    if year != now.year:
        filename = str(year)+filename
    return filename

def vancouver_yearfromfilename(filename):
    now = datetime.datetime.now()
    
    year = filename[:4]
    if is_number(year):
        return int(year)
    else:
        return now.year

def download_all_vancouverdata(ftp_url,download_folder, startyear, endyear):
    """
    Download all files from vancouver ftp
    :param download_folder:
    :return:
    """
    if not os.path.exists(download_folder):
        os.makedirs(download_folder)

    for year in range(startyear, endyear+1):
        urllib.urlretrieve(
            ftp_url+filename_by_year(year),
            download_folder + vancouver_filename(year))
        
        urllib.urlcleanup()
        print("Vancouver Data: Year:"+str(year) + " is downloaded.")

def merge_all_vancouver_csv(source_folder, merged_filename_path):
    
    f = open(merged_filename_path, 'w')
    
    is_header_printed = False
    all_files = os.listdir(source_folder)
    for filename in all_files:
        fileyear = vancouver_yearfromfilename(filename)
        with open(VANCOUVER_EXTRACT_FILES+filename,'r') as csvfile:
            spamreader = csv.reader(csvfile)
            header=spamreader.next()
            if is_header_printed == False:
                header_str = ','.join(header) +","+"YearRecorded"
                f.write(header_str+"\n")
                is_header_printed = True
                
            for row in spamreader:
                if len(row)>1:
                    row = [x.replace(',',' ').replace("\"","") for x in row]
                    line =  ','.join(row) + ','+ str(fileyear)
                    f.write(line +"\n")
    f.close()

def unzip_files(source_folder, destination_folder):
    
    all_zip_files = os.listdir(source_folder)
    
    for zip_file in all_zip_files:
        zip_ref = zipfile.ZipFile(source_folder+zip_file, 'r')
        zip_ref.extractall(destination_folder)
        zip_ref.close()
        print("Unzipped vancouver data, Filename:"+zip_file)
    
    print("All vancouver files unzipped at:"+destination_folder)
        

VANCOUVER_ZIP_FILES_PATH = "/home/jay/BigData/SFUCourses/CMPT733/Project/Data/Vancouver/"
VANCOUVER_FTP_URL = "ftp://webftp.vancouver.ca/OpenData/csv/"
VANCOUVER_EXTRACT_FILES = "/home/jay/BigData/SFUCourses/CMPT733/Project/Data/Vancouver/csv/"
VANCOUVER_MERGED_FILE = "/home/jay/BigData/SFUCourses/CMPT733/Project/Data/merged_vancouver.csv"
    
CURRENT_STEP_NO = 3

if CURRENT_STEP_NO <= 1:        
    # Step 1: Download vancouver data              
    download_all_vancouverdata(VANCOUVER_FTP_URL, VANCOUVER_ZIP_FILES_PATH, 1997, 2017)

if CURRENT_STEP_NO <= 2:
    # Step 2: Unzip vancouver data
    unzip_files(VANCOUVER_ZIP_FILES_PATH, VANCOUVER_EXTRACT_FILES)
    
if CURRENT_STEP_NO <= 3:
    # Step 3: Merge all vancouver data with year at the end as YearRecorded
    merge_all_vancouver_csv(VANCOUVER_EXTRACT_FILES, VANCOUVER_MERGED_FILE)
    



In [5]:
# Spark Usage

MERGED_VANCOUVER_DATA = "/home/jay/BigData/SFUCourses/CMPT733/Project/Data/merged_vancouver.csv"
EACH_VANCOUVER_BUSINESS = '/home/jay/BigData/SFUCourses/CMPT733/Project/Data/businesswise_vancouver.csv'
UNKNOWN_LATLONG_VANCOUVER ='/home/jay/BigData/SFUCourses/CMPT733/Project/Data/unknown_latlong_vancouver.csv'
UNKNOWN_WITH_ADDRESS = '/home/jay/BigData/SFUCourses/CMPT733/Project/Data/unknown_withaddress.csv'

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true')\
.load(MERGED_VANCOUVER_DATA)

df = df.orderBy('YearRecorded')
df.registerTempTable("VancouverBusiness")

startend_business = sqlContext.sql("""
SELECT BusinessName, min(IssuedDate) IssueDate, max(ExpiredDate) ExpiryDate, 
last(YearRecorded) Year, last(Status) LastStatus, BusinessType, 
first(House) House, 
first(Street) Street, 
first(City) City, 
first(Province) Province, 
first(Country) Country, 
first(PostalCode) PostalCode,
first(Latitude) Latitude, 
first(Longitude) Longitude,
first(LocalArea) LocalArea
FROM VancouverBusiness
GROUP BY BusinessType, BusinessName
""")
#startend_business.show()
startend_business.coalesce(1).write.format('com.databricks.spark.csv')\
.options(header='true').save(EACH_VANCOUVER_BUSINESS)

startend_business.registerTempTable('UniqueVancouverBusiness')
lat_null = sqlContext.sql("""
SELECT House, Street, City, Province, Country, PostalCode, LocalArea
FROM UniqueVancouverBusiness
WHERE Latitude IS null
""")

lat_null.coalesce(1).write.format('com.databricks.spark.csv')\
.options(header='true').save(UNKNOWN_LATLONG_VANCOUVER)

#startend_business.show()

lat_null.registerTempTable('UnknownAddress')
all_blank_address = sqlContext.sql("""
SELECT House, Street, City, Province, Country, PostalCode, LocalArea
FROM UnknownAddress
WHERE House IS NOT NULL OR 
Street IS NOT NULL OR 
City IS NOT NULL OR 
Province IS NOT NULL OR
Country IS NOT NULL OR 
PostalCode IS NOT NULL
""")

all_blank_address.coalesce(1).write.format('com.databricks.spark.csv')\
.options(header='true').save(UNKNOWN_WITH_ADDRESS)
print all_blank_address.count()


39006


In [26]:
# Cassandra connection to create table
from cassandra.cluster import Cluster
import csv

HOSTS = ['127.0.0.1']
KEYSPACE = 'jmaity'
cluster = Cluster(HOSTS)
session = cluster.connect(KEYSPACE)

query ="""
CREATE TABLE IF NOT EXISTS AddressLatLong (
  Id UUID,
  IsUpdated Boolean,
  House text,
  Street text,
  City text,
  Province text,
  Country text,
  PostalCode text,
  Latitude float,
  Longitude float,
  PRIMARY KEY (Id, House, Street, City, Province, Country, PostalCode, IsUpdated)
)
"""
item1 = session.execute(query)

TABLE_NAME = 'AddressLatLong'

def format_string(val):
    if val == "":
        val = " "
    else:
        val = val.replace("'","''")
    return val

def format_insert(csvline):
    
    if len(csvline) > 0:
        sql_insert_template = "INSERT INTO "+TABLE_NAME+\
        "(Id, IsUpdated, House, Street, City, Province, Country, PostalCode) " \
        "VALUES(UUID(), false,'{0}','{1}','{2}','{3}','{4}','{5}');\n"

        if len(csvline) > 0:
            house = format_string(csvline[0])
            street = format_string(csvline[1])
            city = format_string(csvline[2])
            province = format_string(csvline[3])
            country = format_string(csvline[4])
            postalcode = format_string(csvline[5])

            return sql_insert_template.format(house, street, city, province, country, postalcode)
        return ""


def execute_sql_batches(filename, row_per_batch):
    
    current_batch_row_count = row_per_batch

    sql_text = "BEGIN BATCH\n"
    max_batchno = 5
    batch_no = 1

    with open(filename,'r') as file_pointer:
        csvfile = csv.reader(file_pointer)
        csvfile.next()
        for line in csvfile:
            if max_batchno < batch_no:
                break
            if current_batch_row_count > 0:
                sql_text += format_insert(line)
                current_batch_row_count -= 1
            else:
                sql_text += "APPLY BATCH;\n"

                print("Executing batch no:"+str(batch_no))
                #print(sql_text)
                session.execute(sql_text)
                #print("Finished execution batch no:"+str(batch_no)+"\n\n")
                sql_text = "BEGIN BATCH\n"
                current_batch_row_count = row_per_batch
                batch_no += 1

        if current_batch_row_count != row_per_batch:
            sql_text += "APPLY BATCH;\n"

            print("Executing batch no:"+str(batch_no)+" for file:" + filename + ".....")
            #print(sql_text)
            session.execute(sql_text)
    #print("All rows areUNKNOWN_WITH_ADDRESS,inserted."+" for file:" + filename + ".....")

UNKNOWN_WITH_ADDRESS = '/home/jay/BigData/SFUCourses/CMPT733/Project/Data/unknown_withaddress.csv/part-00000-0b8310d9-f124-4261-b7c3-eef6188b1694.csv'
execute_sql_batches(UNKNOWN_WITH_ADDRESS, 30)

Executing batch no:1
Executing batch no:2
Executing batch no:3
Executing batch no:4
Executing batch no:5


In [64]:
import urllib2, urllib, json

# baseurl = "https://query.yahooapis.com/v1/public/yql?"
# yql_query = "select wind from weather.forecast where woeid=2460286"
# yql_url = baseurl + urllib.urlencode({'q':yql_query}) + "&format=json"
# result = urllib2.urlopen(yql_url).read()
# data = json.loads(result)

# print data['query']['results']

#libraries for OAuth
import requests
from requests_oauthlib import OAuth1

def latlong_from_address(csvline):
     
    address = ' '.join(csvline)
    #keys
    client_key = 'dj0yJmk9ZUtvYVZYaURqNVNGJmQ9WVdrOVQzcEpVRlZ3TXpnbWNHbzlNQS0tJnM9Y29uc3VtZXJzZWNyZXQmeD03NQ'
    client_secret = '59bfb1cbf4272ad5656a10b5660697781ba202b9'

    #Using OAuth1 to make a simple query request on when your application doesn't require any permissions
    baseurl = "https://query.yahooapis.com/v1/public/yql?"
    yql_query = "select centroid from geo.places where text=\""+address+"\""
    yql_url = baseurl + urllib.urlencode({'q':yql_query}) + "&format=json"
    queryoauth = OAuth1(client_key, client_secret, signature_type='query')
    r = requests.get(url=yql_url, auth=queryoauth)

    result = json.loads(r.content.decode('utf8'))
    latitude =None
    longitude = None

    if result['query']['count'] > 1:
        latitude = result['query']['results']['place'][0]['centroid']['latitude']
        longitude = result['query']['results']['place'][0]['centroid']['longitude']
    elif result['query']['count'] == 1:
        latitude = result['query']['results']['place']['centroid']['latitude']
        longitude = result['query']['results']['place']['centroid']['longitude']
    elif result['query']['count'] == 0:
        print "nO resut"
    
    return latitude, longitude

def format_string(val):
    if val == "":
        val = " "
    else:
        val = val.replace("'","''")
    return val

def format_update(csvline, latitude, longitude):
    
    if len(csvline) > 0:
        sql_insert_template = "INSERT INTO "+TABLE_NAME+\
        "(House, Street, City, Province, Country, PostalCode, Latitude, Longitude) " \
        "VALUES('{0}','{1}','{2}','{3}','{4}','{5}',{6},{7});\n"

        if len(csvline) > 0:
            house = format_string(csvline[0])
            street = format_string(csvline[1])
            city = format_string(csvline[2])
            province = format_string(csvline[3])
            country = format_string(csvline[4])
            postalcode = format_string(csvline[5])

            return sql_insert_template.format(house, street, city, province, country, postalcode, 
                                              latitude, longitude)
        return ""

from cassandra.cluster import Cluster
import csv

HOSTS = ['127.0.0.1']
KEYSPACE = 'jmaity'
cluster = Cluster(HOSTS)
session = cluster.connect(KEYSPACE)

def update_record(csvline):
    lat, lon = latlong_from_address(csvline)
    sql_text = format_update(csvline, lat, lon)

def update_all_latlong()
    
house = "8881"
street = "115th"
city = "Delta"
province ="BC"
country =""
postalcode="V4C 5P2"

print latlong_from_address(house, street, city, province, country, postalcode)



(u'49.16468', u'-122.903511')


In [None]:
CREATE TABLE AddressLatLong (
  House text,
  Street text,
  City text,
  Province text,
  Country text,
  PostalCode text,
  Latitude float,
  Longitude float,
  PRIMARY KEY (House, Street, City, Province, Country, PostalCode)
)
WITH CLUSTERING ORDER BY (Street DESC);

1. CSV parsing issue with , in between and "\"" after 
2. Cassandra is not suitable for storing and updating address to lat long