In [1]:
#read files - beers.csv and breweries.csv
beerPath = "file:///Users/raghumyneni/spark/ipython files/kaggel/beer/beers.csv"
breweriesPath = "file:///Users/raghumyneni/spark/ipython files/kaggel/beer/breweries.csv"
beerRawData = sc.textFile(beerPath)
breweriesRawData = sc.textFile(breweriesPath)

In [2]:
#beer data
beerRawData.take(10)

[u',abv,ibu,id,name,style,brewery_id,ounces',
 u'0,0.05,,1436,Pub Beer,American Pale Lager,408,12.0',
 u"1,0.066,,2265,Devil's Cup,American Pale Ale (APA),177,12.0",
 u'2,0.071,,2264,Rise of the Phoenix,American IPA,177,12.0',
 u'3,0.09,,2263,Sinister,American Double / Imperial IPA,177,12.0',
 u'4,0.075,,2262,Sex and Candy,American IPA,177,12.0',
 u'5,0.077,,2261,Black Exodus,Oatmeal Stout,177,12.0',
 u'6,0.045,,2260,Lake Street Express,American Pale Ale (APA),177,12.0',
 u'7,0.065,,2259,Foreman,American Porter,177,12.0',
 u'8,0.055,,2258,Jade,American Pale Ale (APA),177,12.0']

In [3]:
#breweries data
breweriesRawData.take(10)

[u',name,city,state',
 u'0,NorthGate Brewing ,Minneapolis, MN',
 u'1,Against the Grain Brewery,Louisville, KY',
 u"2,Jack's Abby Craft Lagers,Framingham, MA",
 u'3,Mike Hess Brewing Company,San Diego, CA',
 u'4,Fort Point Beer Company,San Francisco, CA',
 u'5,COAST Brewing Company,Charleston, SC',
 u'6,Great Divide Brewing Company,Denver, CO',
 u'7,Tapistry Brewing,Bridgman, MI',
 u'8,Big Lake Brewing,Holland, MI']

In [4]:
#separate header from data
beerHeader = 'beer_row_num' + beerRawData.first()
beerHeader

u'beer_row_num,abv,ibu,id,name,style,brewery_id,ounces'

In [5]:
#separate data
#discard rows with missing data
beerData = beerRawData.filter(lambda x: 'name,style,brewery_id' not in x) \
                    .filter(lambda x: ',,' not in x) \
                    .map(lambda x: x.split(","))
beerData.first()

[u'14',
 u'0.061',
 u'60.0',
 u'1979',
 u'Bitter Bitch',
 u'American Pale Ale (APA)',
 u'177',
 u'12.0']

In [6]:
#beer class
from collections import namedtuple
Beer = namedtuple('Beer', beerHeader.split(","), verbose=True)

class Beer(tuple):
    'Beer(beer_row_num, abv, ibu, id, name, style, brewery_id, ounces)'

    __slots__ = ()

    _fields = ('beer_row_num', 'abv', 'ibu', 'id', 'name', 'style', 'brewery_id', 'ounces')

    def __new__(_cls, beer_row_num, abv, ibu, id, name, style, brewery_id, ounces):
        'Create new instance of Beer(beer_row_num, abv, ibu, id, name, style, brewery_id, ounces)'
        return _tuple.__new__(_cls, (beer_row_num, abv, ibu, id, name, style, brewery_id, ounces))

    @classmethod
    def _make(cls, iterable, new=tuple.__new__, len=len):
        'Make a new Beer object from a sequence or iterable'
        result = new(cls, iterable)
        if len(result) != 8:
            raise TypeError('Expected 8 arguments, got %d' % len(result))
        return result

    def __repr__(self):
        'Return a nicely formatted representation string'
        return 'Beer(beer_row_num=%r, abv=%r, ibu=%r, id=%r, name=%r, style=%r, brewery_id=%r, ounces=%r)' % self

    def _asdict(s

In [7]:
#parse beer data
def parseBeer(row):
    row[0] = int(row[0])
    row[1] = float(row[1])
    row[2] = float(row[2])
    row[3] = int(row[3])
    row[6] = int(row[6])
    row[7] = float(row[7])
    return Beer(*row[:8])

In [8]:
beerParsed = beerData.map(parseBeer)
beerParsed.first()

Beer(beer_row_num=14, abv=0.061, ibu=60.0, id=1979, name=u'Bitter Bitch', style=u'American Pale Ale (APA)', brewery_id=177, ounces=12.0)

In [9]:
#separate header from data
breweryHeader = 'brewery_id' + breweriesRawData.first()
breweryHeader

u'brewery_id,name,city,state'

In [10]:
#separate data
#discard rows with missing data 
#cleanup the rows
breweryData = breweriesRawData.filter(lambda x: 'name,city,state' not in x) \
                    .filter(lambda x: ',,' not in x) \
                    .map(lambda x: x.replace(" ", "")) \
                    .map(lambda x: x.split(","))
breweryData.first()

[u'0', u'NorthGateBrewing', u'Minneapolis', u'MN']

In [11]:
Brewery = namedtuple('Brewery', breweryHeader.split(","), verbose=True)

class Brewery(tuple):
    'Brewery(brewery_id, name, city, state)'

    __slots__ = ()

    _fields = ('brewery_id', 'name', 'city', 'state')

    def __new__(_cls, brewery_id, name, city, state):
        'Create new instance of Brewery(brewery_id, name, city, state)'
        return _tuple.__new__(_cls, (brewery_id, name, city, state))

    @classmethod
    def _make(cls, iterable, new=tuple.__new__, len=len):
        'Make a new Brewery object from a sequence or iterable'
        result = new(cls, iterable)
        if len(result) != 4:
            raise TypeError('Expected 4 arguments, got %d' % len(result))
        return result

    def __repr__(self):
        'Return a nicely formatted representation string'
        return 'Brewery(brewery_id=%r, name=%r, city=%r, state=%r)' % self

    def _asdict(self):
        'Return a new OrderedDict which maps field names to their values'
        return OrderedDict(zip(self._fields, self))

    def _replace(_self, **kwds):
        'Return a n

In [12]:
#parse brewery data
def parseBrewery(row):
    row[0] = int(row[0])
    return Brewery(*row[:4])

breweryParsed = breweryData.map(parseBrewery)
breweryParsed.first()

Brewery(brewery_id=0, name=u'NorthGateBrewing', city=u'Minneapolis', state=u'MN')

In [13]:
# Beer Style and Total IBU
beerStyleIBUAndCount = beerParsed.map(lambda x: (x.style, x.ibu)) \
                            .combineByKey((lambda value: (value, 1)), \
                                         (lambda acc, value: (acc[0] + value, acc[1] + 1)), \
                                         (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
    
beerStyleIBUAndCount.take(10)

[(u'Tripel', (94.0, 4)),
 (u'Maibock / Helles Bock', (55.0, 2)),
 (u'American IPA', (20358.0, 301)),
 (u'Belgian Pale Ale', (191.0, 8)),
 (u'Fruit / Vegetable Beer', (426.0, 30)),
 (u'Berliner Weissbier', (39.0, 5)),
 (u'Baltic Porter', (162.0, 3)),
 (u'Irish Dry Stout', (116.0, 3)),
 (u'Light Lager', (35.0, 3)),
 (u'California Common / Steam Beer', (82.0, 2))]

In [14]:
# Average IBU for Beer Style sorted
beerStyleAvgIBU = beerStyleIBUAndCount.map(lambda x: (x[0], x[1][0]/x[1][1]))
beerStyleAvgIBU.sortBy(lambda x: -x[1]).collect()

[(u'American Barleywine', 96.0),
 (u'American Double / Imperial IPA', 93.32),
 (u'Russian Imperial Stout', 86.5),
 (u'American Double / Imperial Pilsner', 85.0),
 (u'Belgian Strong Dark Ale', 72.0),
 (u'American Black Ale', 68.9),
 (u'American IPA', 67.63455149501661),
 (u'English Barleywine', 66.66666666666667),
 (u'English Stout', 66.0),
 (u'American Strong Ale', 65.41666666666667),
 (u'American India Pale Lager', 63.333333333333336),
 (u'American Double / Imperial Stout', 62.0),
 (u'Belgian IPA', 57.0),
 (u'English India Pale Ale (IPA)', 54.714285714285715),
 (u'Baltic Porter', 54.0),
 (u'English Strong Ale', 54.0),
 (u'Rye Beer', 52.0),
 (u'American White IPA', 48.833333333333336),
 (u'Extra Special / Strong Bitter (ESB)', 45.714285714285715),
 (u'American Pale Ale (APA)', 44.94117647058823),
 (u'American Stout', 41.3125),
 (u'California Common / Steam Beer', 41.0),
 (u'Old Ale', 40.0),
 (u'Irish Dry Stout', 38.666666666666664),
 (u'Foreign / Export Stout', 38.666666666666664),
 (u

In [15]:
# Beer Style and Total ABV
beerStyleABVAndCount = beerParsed.map(lambda x: (x.style, x.abv)) \
                            .combineByKey((lambda value: (value, 1)), \
                                         (lambda acc, value: (acc[0] + value, acc[1] + 1)), \
                                         (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
    
beerStyleABVAndCount.take(10)

[(u'Tripel', (0.359, 4)),
 (u'Maibock / Helles Bock', (0.113, 2)),
 (u'American IPA', (19.506999999999994, 301)),
 (u'Belgian Pale Ale', (0.43200000000000005, 8)),
 (u'Fruit / Vegetable Beer', (1.5400000000000005, 30)),
 (u'Berliner Weissbier', (0.195, 5)),
 (u'Baltic Porter', (0.281, 3)),
 (u'Irish Dry Stout', (0.16999999999999998, 3)),
 (u'Light Lager', (0.10900000000000001, 3)),
 (u'California Common / Steam Beer', (0.10700000000000001, 2))]

In [16]:
# Average ABV for Beer Style sorted
beerStyleAvgABV = beerStyleABVAndCount.map(lambda x: (x[0], x[1][0]/x[1][1]))
beerStyleAvgABV.sortBy(lambda x: -x[1]).collect()

[(u'English Barleywine', 0.10766666666666667),
 (u'Russian Imperial Stout', 0.09949999999999999),
 (u'American Barleywine', 0.099),
 (u'American Double / Imperial Stout', 0.09666666666666668),
 (u'Baltic Porter', 0.09366666666666668),
 (u'Belgian Strong Dark Ale', 0.092),
 (u'Quadrupel (Quad)', 0.09),
 (u'Tripel', 0.08975),
 (u'American Double / Imperial IPA', 0.08769333333333336),
 (u'Belgian Strong Pale Ale', 0.08349999999999999),
 (u'English Strong Ale', 0.08233333333333333),
 (u'English Stout', 0.08),
 (u'Doppelbock', 0.07849999999999999),
 (u'Belgian Dark Ale', 0.078),
 (u'American Strong Ale', 0.07608333333333334),
 (u'Scotch Ale / Wee Heavy', 0.0756923076923077),
 (u'American Double / Imperial Pilsner', 0.075),
 (u'Dubbel', 0.07450000000000001),
 (u'American Black Ale', 0.07315),
 (u'Old Ale', 0.07200000000000001),
 (u'American Wild Ale', 0.07150000000000001),
 (u'Scottish Ale', 0.07145454545454545),
 (u'Belgian IPA', 0.07100000000000001),
 (u'Winter Warmer', 0.0695),
 (u'Bi\xe8

In [17]:
#Join ABV and IBU for Beer Style
beerStyleAvgIBUAndAvgIBU = beerStyleAvgIBU.join(beerStyleAvgABV)
beerStyleAvgIBUAndAvgIBU.map(lambda x: (x[0], x[1][0] * x[1][1])) \
                        .sortBy(lambda x: -x[1]).collect()

[(u'American Barleywine', 9.504000000000001),
 (u'Russian Imperial Stout', 8.60675),
 (u'American Double / Imperial IPA', 8.183541866666669),
 (u'English Barleywine', 7.177777777777779),
 (u'Belgian Strong Dark Ale', 6.624),
 (u'American Double / Imperial Pilsner', 6.375),
 (u'American Double / Imperial Stout', 5.993333333333334),
 (u'English Stout', 5.28),
 (u'Baltic Porter', 5.058000000000001),
 (u'American Black Ale', 5.0400350000000005),
 (u'American Strong Ale', 4.9771180555555565),
 (u'English Strong Ale', 4.446),
 (u'American IPA', 4.383213275791657),
 (u'Belgian IPA', 4.047000000000001),
 (u'American India Pale Lager', 3.968888888888889),
 (u'Rye Beer', 3.4377777777777774),
 (u'English India Pale Ale (IPA)', 3.400102040816327),
 (u'American White IPA', 3.0276666666666667),
 (u'Old Ale', 2.8800000000000003),
 (u'Extra Special / Strong Bitter (ESB)', 2.6285714285714286),
 (u'Foreign / Export Stout', 2.539111111111111),
 (u'American Stout', 2.5355546875),
 (u'American Pale Ale (AP

In [18]:
# number of breweries per state
breweriesPerState = breweryParsed.map(lambda x: (x.state, 1)) \
                                .reduceByKey(lambda x,y: x+y)
breweriesPerState.sortBy(lambda x: -x[1]).collect()

[(u'CO', 47),
 (u'CA', 39),
 (u'MI', 32),
 (u'OR', 29),
 (u'TX', 28),
 (u'PA', 25),
 (u'WA', 23),
 (u'MA', 23),
 (u'IN', 22),
 (u'WI', 20),
 (u'NC', 19),
 (u'IL', 18),
 (u'NY', 16),
 (u'VA', 16),
 (u'FL', 15),
 (u'OH', 15),
 (u'MN', 12),
 (u'AZ', 11),
 (u'VT', 10),
 (u'ME', 9),
 (u'MO', 9),
 (u'MT', 9),
 (u'CT', 8),
 (u'AK', 7),
 (u'GA', 7),
 (u'MD', 7),
 (u'OK', 6),
 (u'IA', 5),
 (u'LA', 5),
 (u'NE', 5),
 (u'RI', 5),
 (u'ID', 5),
 (u'WY', 4),
 (u'SC', 4),
 (u'KY', 4),
 (u'HI', 4),
 (u'NM', 4),
 (u'UT', 4),
 (u'NJ', 3),
 (u'TN', 3),
 (u'KS', 3),
 (u'AL', 3),
 (u'NH', 2),
 (u'NV', 2),
 (u'MS', 2),
 (u'DE', 2),
 (u'AR', 2),
 (u'ND', 1),
 (u'NorthWoodstock', 1),
 (u'DC', 1),
 (u'WV', 1),
 (u'SD', 1)]

In [19]:
# Beer IBU to ABV frequency distribution
beerIBUToABVAndCount = beerParsed.map(lambda x: (int(x.ibu/10), x.abv)) \
                            .combineByKey((lambda value: (value, 1)), \
                                         (lambda acc, value: (acc[0] + value, acc[1] + 1)), \
                                         (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
    
# Frequency Distribution
beerIBUToABVFrqDist = beerIBUToABVAndCount.map(lambda x: (x[0], x[1][0]/x[1][1]))
beerIBUToABVFrqDist.sortBy(lambda x: -x[0]).collect()

[(13, 0.077),
 (12, 0.08650000000000001),
 (11, 0.083),
 (10, 0.08038709677419355),
 (9, 0.08186486486486486),
 (8, 0.07835714285714288),
 (7, 0.0705047619047619),
 (6, 0.0666241610738255),
 (5, 0.062432098765432105),
 (4, 0.05627096774193549),
 (3, 0.05681407035175878),
 (2, 0.05362251655629136),
 (1, 0.05106018518518515),
 (0, 0.048300000000000023)]

In [20]:
# ABV and IBU are almost directly proportional, but 
# ABV ~ Alcohol by Volume
# IBU ~ International Bittering Units