# Computing User Profiles with Spark
#### Scott, J. A., *Getting Started with Apache Spark: From Inception to Production*, Chapter 5

In this example we use a large dataset from a music streaming service to build a simple real-time dashboard that provides insight into customer behaviours.

## Spark setup
The following code sets up Spark, loads the modules and initialises a Spark context.

In [1]:
import sys

spark_home = "/usr/local/spark-1.6.0-bin-hadoop2.6/"
sys.path.append(spark_home+"python/")
sys.path.append(spark_home+"python/lib/pyspark.zip")
sys.path.append(spark_home+"python/lib/py4j-0.9-src.zip")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    from pyspark.mllib.stat import Statistics
    print("Successfully imported Spark modules")
except ImportError as err:
    print("Failed to import Spark modules:", err)
    sys.exit(1)
    
conf = SparkConf().setAppName("ListenerSummariser")
sc = SparkContext(conf=conf)

Successfully imported Spark modules


## Downloading the data
The data for this project is in the form of CSV files that can be downloaded from:

- https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/tracks.csv (events data)
- https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/cust.csv (customer data)

The music streaming service has users whom are continuously connecting to it and listening to music tracks. File `tracks.csv` consists of a collection of (simulated) customer events, one per line; each event is a client listening to a track, and the dataset spans a period of several months. The file does not include a header line.

Field name | Type | Description
-----------|------|------------
EventID | integer | Numerical ID of the event
CustID | integer | Numerical ID of customer
TrackID | integer | Numerical ID of track
EventDate | string | Date and time of the event
mobile | integer | Indicates whether the user was listening on a mobile device
listenzip | integer | Zip code of geolocation where track was selected

The customer information dataset in file `cust.csv` consists of all statically known details about a user.

Field name | Type | Description
-----------|------|------------
CustID | integer | Unique numerical ID of customer
Name | string | Full name of customer
Gender | integer | Gender of customer
Address | string | Address of customer
zip | integer | Zip code of customer's address
SignDate | string | Date of addition to the service
Status | integer | Indicates whether or not the account is active (0 = closed, 1 = active)
Level | integer | Indicates level of service (0 = Free, 1 = Silver, 2 = Gold)
Campaign | integer | Indicates the campaign under which the user joined:<br> **NONE**: no campaign<br> **30DAYFREE**: a '30-day free trial' offer<br> **SUPERBOWL**: a Super Bowl-related program<br> **RETAILSTORE**: an offer originating in a brick-and-mortar retail store<br> **WEBOFFER**: an offer for web-originated customers
LinkedWithApps | integer | 


In [2]:
import os
import urllib2

# Create a directory to store downloaded data (if it doesn't exist already)
if not os.path.exists("source_data/"):
    os.makedirs("source_data/")
    
# Download the events data file
fname = "tracks.csv"
baseurl = "https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/"
url = baseurl + fname

if not os.path.isfile("source_data/"+fname):
    f = urllib2.urlopen(url)
    with open("source_data/"+fname, "wb") as dfile:
        dfile.write(f.read())
        
# Download the customers data file
fname = "cust.csv"
url = baseurl + fname

if not os.path.isfile("source_data/"+fname):
    f = urllib2.urlopen(url)
    with open("source_data/"+fname, "wb") as dfile:
        dfile.write(f.read())

## Customer analysis
The first step is to read the CSV records with the individual track events and make a pair RDD out of all the rows. To convert each line of data into an array, first the `map()` function is used and then `reduceByKey()` is called to consolidate all the arrays.

In [12]:
trackfile = sc.textFile("source_data/tracks.csv")

def make_tracks_kv(str):
    l = str.split(",")
    return [l[1], [[int(l[2]), l[3], int(l[4]), l[5]]]]

# Make a k,v RDD out of the input data
tbycust = trackfile.map(lambda line: make_tracks_kv(line)).reduceByKey(lambda a,b: a+b)

Individual track events are now stored in a *PairRDD*, with the customer ID as the key. A summary profile can now be computed for each user, which will include:

- Average number of tracks during each period of the day (time ranges are arbitrarily defined in the code)
- Total unique tracks, i.e. the set of unique track IDs
- Total mobile tracks, i.e. tracks played whe the `mobile` flag was set

In [8]:
def compute_stats_byuser(tracks):
    mcount = morn = aft = eve = night = 0
    tracklist = []
    for t in tracks:
        trackid, dtime, mobile, zip = t
        if trackid not in tracklist:
            tracklist.append(trackid)
        d, t = dtime.split(" ")
        hourofday = int(t.split(":")[0])
        mcount += mobile
        if (hourofday < 5):
            night += 1
        elif (hourofday < 12):
            morn += 1
        elif (hourofday < 17):
            aft += 1
        elif (hourofday < 22):
            eve += 1
        else:
            night += 1
    return [len(tracklist), morn, aft, eve, night, mcount]

In [9]:
# Compute profile for each user
custdata = tbycust.mapValues(lambda a: compute_stats_byuser(a))

# Compute aggregate stats for the entire track history
aggdata = Statistics.colStats(custdata.map(lambda x: x[1]))

Calling `collect()` on this RDD will persist the results back to a file. These results could be stored in a database, but here for simplicity we'll output the result to two CSV files:

- `live_table.csv` which contains the latest calculations
- `agg_table.csv` containing the aggregated data

In [16]:
import csv

if not os.path.exists("output/"):
    os.makedirs("output/")

for k, v in custdata.collect():
    unique, morn, aft, eve, night, mobile = v
    tot = morn + aft + eve + night

    # Persist the data, in this case write to a file    
    with open("output/live_table.csv", "ab") as csvfile:
        fwriter = csv.writer(csvfile, delimiter=' ', quotechar='|',
                             quoting=csv.QUOTE_MINIMAL)
        fwriter.writerow([unique, morn, aft, eve, night, mobile])

      
# Do the same with the summary data
with open("output/agg_table.csv", "wb") as csvfile:
    fwriter = csv.writer(csvfile, delimiter=' ', quotechar='|',
                         quoting=csv.QUOTE_MINIMAL)
    fwriter.writerow([aggdata.mean()[0], aggdata.mean()[1],
                      aggdata.mean()[2], aggdata.mean()[3],
                      aggdata.mean()[4], aggdata.mean()[5]])


TypeError: 'instancemethod' object has no attribute '__getitem__'