![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# Getting Top 5 Breweries and Styles for each User.

In [1]:
sc.defaultParallelism

4

### **Preliminaries**
#### We read in the allBeer.txt file and create an RDD consisting of lines.
#### We want to remove the header from the file, so the parseDataFileLine function identifies lines starting with 'beer_id' and applies a flag of 0, other lines with the correct number of fields are flagged 1, and incorrect lines are flagged -1.  The lines are split into arrays.

In [2]:
import re

def parseDatafileLine(datafileLine):
    ##Parse a line of the data file using the specified regular expression pattern
    splitArray = datafileLine.split("\t")
    for x in range(0,len(splitArray)):
        splitArray[x]=splitArray[x].replace("\"",'')
    #print len(splitArray)
    #print splitArray[0],
    splitArray[1],splitArray[2]
    if splitArray[0]=='beer_id':
        return (splitArray,0)
    elif len(splitArray)<>23:
        ##this is a failed parse
        return (splitArray,-1)
    else:
        return (splitArray, 1)

### Reading the file
#### We read the file into three rdds by first parsing the file as above, the header rdd, failed rdd and the valid rdd.  Print the header names so we can remember what fields we're dealing with and in what order.

In [3]:
import sys
import os

baseDir = os.path.join('')
allBeer_Path = 'AllBeer.txt'
STOPWORDS_PATH = 'stopwords.txt'

def parseData(filename):
    #Parse a data file returns a RDD of parsed lines
    
    return (sc
            .textFile(filename, 4, True)
            .map(parseDatafileLine)
            .cache())

def loadData(path):
    ##Load a data file, returns a RDD of parsed valid lines
    
    filename = os.path.join(baseDir, path)
    raw = parseData(filename).cache()
    failed = (raw
              .filter(lambda s: s[1] == -1)
              .map(lambda s: s[0]))
    for line in failed.take(10):
        print '%s - Invalid datafile line: %s' % (path, line)
    valid = (raw
             .filter(lambda s: s[1] == 1)
             .map(lambda s: s[0])
             .cache())
    header = (raw
              .filter(lambda s: s[1]==0)
             .map(lambda s:s[0])
             )
    headerDict={}
    for line in header.take(1):
        for x in range(0,len(line)):
            headerDict[x]=line[x]
            print x,line[x]
            
    rawLines = raw.count()
    validLines = valid.count()
    failedLines = failed.count()
    print '%s - Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (path, rawLines, validLines,failedLines)
    return valid, headerDict
    
allBeer,headers = loadData(allBeer_Path)
#allReviews = loadData(allReviews_Path)

0 beer_id
1 beer_name
2 brewer_name
3 beer_style
4 distribution
5 brewery_location
6 commercial_desc
7 RATINGS: 
8 MEAN (/5)
9 WEIGHTED AVG
10 EST. CALORIES
11 ABV (%)
12 IBU
13 SCORE
14 AROMA (/10)
15 APPEARANCE(/5)
16 TASTE(/10)
17 PALATE(/5)
18 OVERALL(/20)
19 reviewer_name
20 review_location
21 review_date
22 review_content
AllBeer.txt - Read 620388 lines, successfully parsed 620387 lines, failed to parse 0 lines


### Let's examine the first few entries of a sample of 5 lines to check if things look ok.

In [4]:
sampleArray=allBeer.takeSample(False,5,1)
for line in sampleArray:
    print len(line)
    print 'allBeer: %s, %s, %s, %s, %s\n' % (line[0], line[1], line[2],line[3],line[4])

23
allBeer: 62903, duclaw-bourbon-barrel-serum, DuClaw Brewing Company, Imperial IPA, distribution unknown

23
allBeer: 106807, atlas-red-squirrel, Atlas (Sinclair Breweries), Bitter, distribution unknown

23
allBeer: 108406, fiddler-ale, De Haagsche Bierbrouwerij, Bitter, distribution unknown

23
allBeer: 67592, hoganas-apa, H�gan�s Bryggeri, American Pale Ale, distribution unknown

23
allBeer: 78504, beach-city-kickout-double-ipa, Beach City Brewery, Imperial IPA, Local Distribution



### Clean the Data
#### Since we're heavily dependent on the number of reviews each individual gives we will need to remove users who have an unacceptably low number, since we're dividing into training and test sets and the goal is to predict an order we should have at least 3 beers per user in the test set.  If we split 80/20 this means at least 12 beer reviews overall.  We are also going to immediately purge any blank reviews.

In [5]:
##Purge blank reviews and make (k,V) pairs
nonEmpty = allBeer.map(lambda x:(x[19],x)).filter(lambda (x,y):y[22]!='')
##Convert strings to floats for 13 through 18
def convertStrings(inputList,indexList):
    outList = []
    for x in range(0,len(inputList)):
        if x in indexList:
            outList.append(float(inputList[x]))
        else:
            outList.append(inputList[x])
    return outList
convIndicies=[13,14,15,16,17,18]
convertedToFloats = nonEmpty.map(lambda (x,y):(x,convertStrings(y,convIndicies)))

##Remove users with less than 12 reviews.
def removeUsers(inputRDD,minReviews):
    countPerUser = inputRDD.map(lambda (x,y):(x,1)).reduceByKey(lambda a,b:a+b)
    outRDD = inputRDD.join(countPerUser).filter(lambda (x,(y,z)):z>=minReviews)
    return outRDD.map(lambda (x,(y,z)):(x,y))

##Remove users not in Ontario.
def filterByLocation(inputRDD,location):
    outRDD = inputRDD.filter(lambda (x,y):y[20].lower().find(location)!=-1)
    return outRDD

beerByUser = removeUsers(convertedToFloats,12).cache()
print "There are %d users in the dataset." % beerByUser.map(lambda (x,y):x).distinct().count()
print "There are %d beers in the dataset." % beerByUser.map(lambda (x,y):y[0]).distinct().count()
print "There are %d reviews in the dataset." % beerByUser.count()

There are 4005 users in the dataset.
There are 99891 beers in the dataset.
There are 591212 reviews in the dataset.


In [28]:
import unicodedata
import MySQLdb
def getUsersFromDatabase():
    userDictionary={}
    mydb = MySQLdb.connect(host='localhost',
        user='pythonconnector',
        passwd='python1029',
        db='BeerRatings')
    cursor = mydb.cursor()
    cursor.execute("SELECT * FROM users")
    queryResults = cursor.fetchall()
    cursor.close()
    mydb.close()
    for result in queryResults:
        userDictionary[result[1].decode('utf-8')]=result[0]
    return userDictionary

userDict = getUsersFromDatabase()
print userDict.keys()[0:5]

beerById = beerByUser.map(lambda (x,y):(userDict[x],y))

[u'nazzty', u'Manetsdad', u'Bewitched', u'Nejhleader', u'CObiased']


### Let's find each user's favourite styles, so we can then output it to a database

In [29]:
import operator
def topNfromDict(inputDict,N):
    return dict(sorted(inputDict.iteritems(), key=operator.itemgetter(1), reverse=True)[:N])
    
def reduceIntoDict(a,b):
    #a and b should be dict of {style:(rating,1)}
    combinedDict={}
    for element in a:
        if element in b:
            combinedDict[element]=(a[element][0]+b[element][0],a[element][1]+b[element][1])
        else:
            combinedDict[element]=a[element]
    for element in b:
        if element in combinedDict:
            pass
        else:
            combinedDict[element]=b[element]
    return combinedDict

def convertDict(inputDict,styleDict):
    outDict={}
    sumBeers=0
    for style in inputDict:
        sumBeers +=inputDict[style][1]
    
    for style in styleDict:
        if style in inputDict:
            outDict[style]=(inputDict[style][0]/inputDict[style][1],float(inputDict[style][1])/sumBeers)
        else:
            outDict[style]=(0,0)
    return outDict

import pickle
beerStyles = beerByUser.map(lambda (x,y):y[3]).distinct().collect()
styleDict={}
for style in beerStyles:
    styleDict[style]=(0,0)  ##count and sum
    
countPerSum = beerById.map(lambda (x,y):(y[3],1)).reduceByKey(lambda a,b:a+b)
avgPerStyle = beerById.map(lambda (x,y):(y[3],y[13])).reduceByKey(lambda a,b:a+b).join(countPerSum).map(lambda (x,(y,z)):(x,y/z)).collect()
avgPerStyleDict = {a[0]:a[1] for a in avgPerStyle}
#print avgPerStyleDict
with open('/usr/ALSServer/dbfiles/styleAverages.txt', 'wb') as f:
    pickle.dump(avgPerStyleDict,f)
styleSums = beerById.map(lambda(x,y):(x,{y[3]:(y[13],1)})).reduceByKey(reduceIntoDict)
styleAvgAndFreq=styleSums.map(lambda (x,y):(x,convertDict(y,styleDict)))
styleAvgAndFreqTop5=styleAvgAndFreq.map(lambda (x,y):(x,topNfromDict(y,5))).collect()
with open('/usr/ALSServer/dbfiles/top5StylesPerUser.txt', 'wb') as f:
    pickle.dump(styleAvgAndFreqTop5,f)
#print styleAvgAndFreqTop5.takeSample(False,1,5)


### Output top 5 breweries per person

In [30]:
breweries = beerByUser.map(lambda (x,y):y[2]).distinct().collect()
brewerDict={}
for brewer in breweries:
    brewerDict[brewer]=(0,0)  ##count and sum
    
countPerSum = beerById.map(lambda (x,y):(y[2],1)).reduceByKey(lambda a,b:a+b)
avgPerBrewer = beerById.map(lambda (x,y):(y[2],y[13])).reduceByKey(lambda a,b:a+b).join(countPerSum).map(lambda (x,(y,z)):(x,y/z)).collect()
avgPerBrewerDict = {a[0]:a[1] for a in avgPerBrewer}
#print avgPerBrewerDict
with open('/usr/ALSServer/dbfiles/breweryAverages.txt', 'wb') as f:
    pickle.dump(avgPerBrewerDict,f)
brewerSums = beerById.map(lambda(x,y):(x,{y[2]:(y[13],1)})).reduceByKey(reduceIntoDict)
brewerAvgAndFreq=brewerSums.map(lambda (x,y):(x,convertDict(y,brewerDict)))
brewerAvgAndFreqTop5=brewerAvgAndFreq.map(lambda (x,y):(x,topNfromDict(y,5))).collect()
with open('/usr/ALSServer/dbfiles/top5BrewersPerUser.txt', 'wb') as f:
    pickle.dump(brewerAvgAndFreqTop5,f)
print brewerAvgAndFreqTop5[0:5]

[(1540L, {u'Brouwerij Emelisse': (4.0, 0.003424657534246575), u'City of Cambridge (Wolf)': (4.0, 0.003424657534246575), u'Brooklyn Brewery': (4.5, 0.003424657534246575), u'Duelund Bryglade': (4.3, 0.003424657534246575), u'Great Divide Brewing Company': (4.1, 0.003424657534246575)}), (3080L, {u'Spearhead Brewing Company': (4.1, 0.045454545454545456), u'Smithworks Brewing Company': (4.0, 0.045454545454545456), u'Railway City Brewing Company': (3.1, 0.045454545454545456), u'Martens': (3.1, 0.045454545454545456), u'Box Steam': (3.1, 0.045454545454545456)}), (1036L, {u'Southern Star Brewing Company': (4.6, 0.007518796992481203), u'Deschutes Brewery': (4.3, 0.007518796992481203), u'Stone Brewing Company': (4.0, 0.03007518796992481), u'Victory Brewing Company': (4.1, 0.015037593984962405), u'Founders Brewing Company': (4.0, 0.015037593984962405)}), (2576L, {u'Browar Cornelius': (3.7, 0.02702702702702703), u'Browar Artezan': (3.8, 0.02702702702702703), u'Browar Kingpin': (4.1, 0.02702702702702