# Computing User Profiles with Spark

This use case will bring together the core concepts of Spark and use a large dataset to build a simple real-time dashboard that provides insight into customer behaviors.

Contributed by: Nick Amato, Director, Technical Marketing for MapR

## Delivering Music

Music streaming is a rather pervasive technology which generates massive quantities of data. This type of service is much like people would use every day on a desktop or mobile device, whether as a subscriber or a free listener (perhaps even similar to a Pandora). This will be the foundation of the use case to be explored. Data from such a streaming service will be analyzed.

The basic layout consists of customers whom are logging into this service and listening to music tracks, and they have a variety of parameters:

    Demographic information (gender, location, etc.)
    Free / paid subscriber
    Listening history; tracks selected, when, and geolocation when they were selected

Python, PySpark and MLlib will be used to compute some basic statistics for a dashboard, enabling a high-level view of customer behaviors as well as a constantly updated view of the latest information.

## Looking at the Data

The data will be loaded directly from a CSV file. There are a couple of steps to perform before it can be analyzed. The data will need to be transformed and loaded into a PairRDD. This is because the data consists of arrays of (key, value) tuples.

The customer events-individual tracks dataset (tracks.csv) consists of a collection of events, one per line, where each event is a client listening to a track. This size is approximately 1M lines and contains simulated listener events over several months. Because this represents things that are happening at a very low level, this data has the potential to grow very large.

The event, customer and track IDs show that a customer listened to a specific track. The other fields show associated information, like whether the customer was listening on a mobile device, and a geolocation. This will serve as the input into the first Spark job.

The customer information dataset (cust.csv) consists of all statically known details about a user.

The fields are defined as follows:

    Customer ID: a unique identifier for that customer
    Name, gender, address, zip: the customer's associated information
    Sign date: the date of addition to the service
    Status: indicates whether or not the account is active (0 = closed, 1 = active)
    Level: indicates what level of service -0, 1, 2 for Free, Silver and Gold, respectively
    Campaign: indicates the campaign under which the user joined, defined as the following (fictional) campaigns driven by our (also fictional) marketing team:
        NONE no campaign
        30DAYFREE a '30 days free' trial offer
        SUPERBOWL a Super Bowl-related program
        RETAILSTORE an offer originating in brick-and-mortar retail stores
        WEBOFFER an offer for web-originated customers

Other datasets that would be available, but will not be used for this use case, would include:

    Advertisement click history
    Track details like title, album and artist


## Customer analysis

All the right information is in place and a lot of micro-level detail is available that describes what customers listen to and when. The quickest way to get this data to a dashboard is by leveraging Spark to create summary information for each customer as well as basic statistics about the entire user base. After the results are generated, they can be persisted to a file which can be easily used for visualization with BI tools such as Tableau, or other dashboarding frameworks like C3.js or D3.js.

Step one in getting started is to initialize a Spark context. Additional parameters could be passed to the SparkConf method to further configure the job, such as setting the master and the directory where the job executes.

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.mllib.stat import Statistics
import csv

#Add parameter to further configure
conf = SparkConf().setAppName('ListenerSummarizer')
#Initialize Spark context
sc = SparkContext(conf=conf)

The next step will be to read the CSV records with the individual track events, and make a PairRDD out of all of the rows. To convert each line of data into an array, the map() function will be used, and then reduceByKey() is called to consolidate all of the arrays.

In [2]:
# Load tracks.csv file
tracks_file = sc.textFile("./raw/tracks.csv")

def make_tracks_kv(str):
    line = str.split(",")
    # Create PairRDD with line is tuple data: key is line[1]: customer ID
    return [line[1], [[int(line[2]), line[3], int(line[4]), line[5]]]]

# Make a key,value RDD out of the input data
tracks_rdd = tracks_file.map(lambda line: make_tracks_kv(line)) \
                        .reduceByKey(lambda a,b: a+b)
tracks_rdd.first() 

#tracks_rdd = tracks_file.map(lambda line: line.split(",")) \
#                        .map(lambda p: [p[1], [[int(p[2]), p[3], int(p[4]), p[5]]]]) \
#                        .reduceByKey(lambda a,b: a+b)

(u'4446',
 [[511, u'"2014-12-04 03:35:27"', 1, u'"23081"'],
  [527, u'"2014-11-26 21:31:06"', 0, u'"96142"'],
  [1318, u'"2014-11-12 21:30:24"', 1, u'"49751"'],
  [1129, u'"2014-10-21 10:17:16"', 1, u'"78567"'],
  [667, u'"2014-12-10 09:26:40"', 1, u'"44622"'],
  [90, u'"2014-11-24 15:33:56"', 1, u'"28391"'],
  [1291, u'"2014-11-07 04:04:03"', 0, u'"55168"'],
  [1128, u'"2014-12-19 02:51:00"', 0, u'"07501"'],
  [22, u'"2014-10-09 07:55:57"', 0, u'"58121"'],
  [271, u'"2014-11-30 14:36:02"', 0, u'"36663"'],
  [587, u'"2014-12-10 22:23:31"', 0, u'"06232"'],
  [606, u'"2014-12-17 22:37:04"', 1, u'"38037"'],
  [141, u'"2014-10-05 02:55:38"', 1, u'"19126"'],
  [11, u'"2014-12-24 22:28:50"', 1, u'"20650"'],
  [1036, u'"2014-10-30 07:50:21"', 0, u'"28220"'],
  [1616, u'"2014-11-28 08:56:29"', 1, u'"02638"'],
  [1375, u'"2014-12-28 13:04:35"', 0, u'"67878"'],
  [1267, u'"2014-11-10 13:27:23"', 1, u'"10028"'],
  [390, u'"2014-11-22 14:38:49"', 1, u'"10307"'],
  [283, u'"2014-10-09 12:59:21"', 1

The individual track events are now stored in a PairRDD, with the customer ID as the key. A summary profile can now be computed for each user, which will include:

    Average number of tracks during each period of the day (time ranges are arbitrarily defined in the code)
    Total unique tracks, i.e., the set of unique track IDs
    Total mobile tracks, i.e., tracks played when the mobile flag was set

By passing a function to mapValues, a high-level profile can be computed from the components. The summary data is now readily available to compute basic statistics that can be used for display, using the colStats function from pyspark.mllib.stat.

In [24]:
def compute_stats_by_user(tracks):
    mcount = morning = afternoon = evening = night = 0
    tracklist = []
    for t in tracks:
        trackid, dtime, mobile, zip = t
        if trackid not in tracklist:
            tracklist.append(trackid)
        d,t = dtime.split(" ")
        hour_of_day = int(t.split(":")[0])
        # Compute total mobile track
        mcount += mobile
        # Compute number of tracks during each period of the day
        if (hour_of_day < 5):
            night += 1
        elif (hour_of_day < 12):
            morning += 1
        elif (hour_of_day < 17):
            afternoon += 1
        elif (hour_of_day < 22):
            evening += 1
        else:
            night += 1
        # len(tracklist): total unique track, mcount: total mobile track
        return [len(tracklist), morning, afternoon, evening, night, mcount]

# Compute profile for each user
# mapValues(): Pass each value in the key-value pair RDD through a map function
# without changing the keys; this also retains the original RDD’s partitioning.
customer_data = tracks_rdd.mapValues(lambda a: compute_stats_by_user(a))

# Compute aggregate stats for entire track history
# Compute basic statistics that can be used for display
# like the mean and variance for each of the fields in customer_data RDD
aggregate_data = Statistics.colStats(customer_data.map(lambda x: x[1]))

The last line provides meaningful statistics like the mean and variance for each of the fields in the per-user RDDs that were created in custdata.

Calling collect() on this RDD will persist the results back to a file. The results could be stored in a database such as MapR-DB, HBase or an RDBMS (using a Python package like happybase or dbset). For the sake of simplicity for this example, using CSV is the optimal choice. There are two files to output:

    live_table.csv containing the latest calculations
    agg_table.csv containing the aggregated data about all customers computed with Statistics.colStats


In [None]:
# write latest calculations to a file
with open("./raw/live_table.csv", "wb") as live_file:
    fwriter = csv.writer(live_file, delimiter = ' ',
                        quotechar = '|', quoting = csv.QUOTE_MINIMAL)
    for key, value in customer_data.collect():
        unique, morning, afternoon, evening, night, mobile = value
        total_hour = morning + afternoon + evening + night
        fwriter.writerow([unique, morning, afternoon, evening, night, mobile])

# write aggregated data about all customers computed with Statistics.colStats
with open("./raw/agg_table.csv", "wb") as agg_file:
    fwriter = csv.writer(agg_file, delimiter = ' ',
                        quotechar = '|', quoting = csv.QUOTE_MINIMAL)
    fwriter.writerow([aggregate_data.mean()[0], aggregate_data.mean()[1],
                        aggregate_data.mean()[2], aggregate_data.mean()[3],
                        aggregate_data.mean()[4], aggregate_data.mean()[5]])