# Music Data Analysis using Apache Spark on Databricks Community Edition
In this notebook, we will be demonstrating various concepts of Apache Spark such as transformations and actions. I will be executing the examples using various API's present in Apache Spark such as RDD's and DataFrames.

The notebook is written and executed on Databricks Community Edition. Getting started with Databricks community edition can be found <a href="https://docs.databricks.com/user-guide/index.html">here</a>. Introduction to Apache Spark with Databricks can be found <a href="https://docs.databricks.com/spark/latest/training/index.html">here</a>

This example has been insipred from <a href="https://www.mapr.com/blog/real-time-user-profiles-spark-drill-and-mapr-db"> this blog post</a> on MapR Academy blog. The data has been acquired from the <a href="https://github.com/mapr/mapr-demos/tree/master/spark_music_demo">Github account</a> of MapR academy.

### Problem Statement
In a hypothetical music streaming website, customers are constantly connected to the service and are listening to the tracks. The data caputured by these events forms the core dataset of our problem. There are various attrubutes associated with the customers and the tracks. 

### Datasets

1. <a href="https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/tracks.csv"><b>tracks.csv</b></a> - Contains the collection of the data where the customer is listening to a track, one event per line. The various attributes present in the dataset are
 1. Event ID : The unique identifier of the event where the customer is listening to the track (Integer)
 2. Customer ID : The customer Id of the customer listening to the track (Integer)
 3. Track ID : The track Id of the track currently being played (Integer)
 4. DateTime : The date and tim, the customer is listening to the track (String)
 5. Mobile : 1, if the customer is listening to the track on a mobile device else 0 (Integer)
 6. Listening ZIP : The approximate Zip location of the customer listening to the track (Integer)
2. <a href="https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/cust.csv"> <b>cust.csv</b></a> - Contains the details about the customer
 1. Customer ID: The unique identifier of the customer (Integer)
 2. Name, Gender, Address, Zip : The information associated with the customer (String, Integer, String, Integer)
 3. Sign Date : The date the customer has been added to the service (String)
 4. Level : The level of subscription of the customer 0, 1, 2 for Free, Silver and Gold, respectively (Integer)
 5. Other fields: Which we are not interested in this example

### Downloading the datasets

Download the datasets using the shell command wget and the URL, save them into the tmp directory. The URL's for the datasets are
1. tracks.csv : https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/tracks.csv
2. cust.csv : https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/cust.csv

In [3]:
%sh
wget -P /tmp "https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/tracks.csv"
wget -P /tmp "https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/cust.csv"

### Uploading the datasets into Databricks file system

Databricks file system is a distributed file system lying on top of Amazon S3. We will upload the data from the local file system into our DBFS. Below is a python script which copies the data from the local file system into the datasets folder of DBFS of your cluster.

Note: The local files are referenced using `file:/` and DBFS files are referenced using `dbfs:/`

In [5]:
localTrackFilePath = "file:/tmp/tracks.csv"
localCustomerFilePath = "file:/tmp/cust.csv"
dbutils.fs.mkdirs("dbfs:/datasets")
dbutils.fs.cp(localTrackFilePath, "dbfs:/datasets/")
dbutils.fs.cp(localCustomerFilePath, "dbfs:/datasets")
#Displaying the files present in the DBFS datasets folder of your cluser
display(dbutils.fs.ls("dbfs:/datasets"))

### Creating named tuples in Python

The downloaded datasets are in CSV format and the row's in a CSV file have no inherent structure. Named tuples in python are light weight immutable objects. Named tuple instances can be referenced using object like variable deferencing or the standard tuple syntax. This makes the code more redeable and more pythonic

Below is the creation of named tuples for the Track dataset.

In [7]:
from collections import namedtuple

track_fields = ('event_id', 'customer_id', 'track_id', 'date_time', 'mobile', 'zip')
Track = namedtuple('Track', track_fields, verbose=True)

### Lambda functions in Python

In Python, anonymous function is a function that is defined without a name.
While normal functions are defined using the def keyword, in Python anonymous functions are defined using the lambda keyword.
Hence, anonymous functions are also called lambda functions.

More about Lambda functions can be found <a href="http://www.programiz.com/python-programming/anonymous-function">here</a>

### Defining the problem statement

We would be performing analysis on the music dataset using the following questions
1. Number of unique tracks listened by a user.
2. Number of mobile tracks listened by a user.
3. Percentage of tracks listened by the users during different times of the day.
4. Most popular track and its name.

The analysis would be done by Apache Spark actions and transformations. Actions and transformations such as `map`, `filter`, `reduceByKey`, `reduce`, `collect` etc.. would be covered in the example.

#### Loading and creating the first RDD

1. Load the data into a RDD using the spark context `textFile` method.
2. Define python method which parses the induvidual track.csv row and converts it into a named tuple.
3. Peform a map transformation, which transforms the tracksRDD into RDD, containing `Track` named tuples.
4. Persist the RDD in memory, using `persist` method since it will be used in the coming examples.

In [11]:
#Method to parse the tracks and storing it as a Track Named Tuple
def parse_tracks(row):
        track_row = row.split(",")
        event_id = int(track_row[0])
        customer_id = int(track_row[1])
        track_id = int(track_row[2])
        date_time = str(track_row[3])
        mobile = int(track_row[4])
        zip_code = str(track_row[5])
        return Track(event_id,customer_id,track_id,date_time,mobile,zip_code)

#Loading the tracks.csv file using the spark context 
tracks_RDD = sc.textFile("dbfs:/datasets/tracks.csv")
#Parsing the tracks into named tuples and persisting the tracks
tracks_parsed = tracks_RDD.map(parse_tracks).persist()

### 1. Number of unique tracks listened by a user

1. `map` through the tracks_parsed RDD and create a pair RDD with Customer ID as key and a list containing Track ID as the value.
2. Apply a `reduceByKey` operation and combine all the Track Id's listened by a customer into a list.
3. Map through the values of pair RDD, convert the list of all Track's listened by the user into a set, which automatically removes the duplicates.
4. Map through the values of the resulting pair RDD, find the length of the set of all Track's listened by a user.
5. Apply action `take(10)` which kicks of the creation of all the RDD's created using the above transformations.

In [13]:
tracks_pairRDD = tracks_parsed.map(lambda track: (track.customer_id,[track.track_id]))
tracks_by_userRDD = tracks_pairRDD.reduceByKey(lambda a,b: a+b)
unique_tracksRDD = tracks_by_userRDD.mapValues(lambda list_tracks: set(list_tracks))
number_unique_tracksRDD = unique_tracksRDD.mapValues(lambda set_tracks: len(set_tracks))
#Printing the number of tracks listened by 10 users, it can be done for all users similarly by collect() action 
for customer_unique in number_unique_tracksRDD.take(10):
  print "Customer Id:", customer_unique[0], "has listened to", customer_unique[1], "unique tracks"

#### Displaying the customer names along with their number unique tracks

Using transformations and actions, above we were able to generate the number of unique tracks listened by each customer ID, but what if we want to know the name associated with a customer ID?

Create a dictionary which maps customerID's to customer Names. But the dictionary resides locally on the driver program and the tasks are executed on the worker nodes. So the dictionary needs to be shipped to the worker nodes along with the tasks which is very expensive computationally. So we use broadcast variables.

About broadcast variables from <a href="http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables">official documentation</a>
> Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.Explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

So, we start by loading the cust.csv name from the tmp folder of our local file system, create a dictionary mapping customer ID's to customer names and create a broadcast variable using the sparkcontext's `broadcast` method. The value of the broadcast variable can be accessed using the `value` method. 

After creating the broadcast variable, perform the `take(10)` action on the number_unique_tracksRDD, and print the Customer Name using the `value` method

In [15]:
#Method to create a dictionary which maps Customer ID's to Customer names
def create_cust_dict():
  custDict = {}
  with open("/tmp/cust.csv") as f:
    for line in f:
      fields = line.split(",")
      if(fields[0]!="CustID"):
        custId = int(fields[0])
        cust_name = str(fields[1])
        custDict[custId] = cust_name
  return custDict

# Creating a broadcast variable cust_names from the dictionary returned by the create_cust_dict method above
cust_names = sc.broadcast(create_cust_dict())

#Printing the Customer Names using the broadcast variable value method
for customer_unique in number_unique_tracksRDD.take(10):
  print cust_names.value[customer_unique[0]], "has listened to", customer_unique[1], "unique tracks"

Gregory Koval has listened to 1617 unique tracks
Paula Peltier has listened to 1408 unique tracks
David Garcia has listened to 1306 unique tracks
Marjorie Jackman has listened to 1191 unique tracks
Gregory Laird has listened to 1140 unique tracks
Joshua Threadgill has listened to 1067 unique tracks
James Davis has listened to 1050 unique tracks
"Thomas Marable has listened to 989 unique tracks
Everett Marlow has listened to 956 unique tracks
Gena Nelson has listened to 927 unique tracks

### 2. Total number of mobile tracks listened by a customer

1. Apply a filter transformation on the tracks_parsed RDD, to filter out the tracks which are not listened using mobile.
2. Create a pair RDD which contains the Customer ID as the key and 1 as the value.
3. Apply `reduceByKey` transformation and find the sum of all mobile tracks listened by the user.
4. Apply `take(10)` action and using the broadcast variables print the names and number of mobile tracks listened by the customers.

In [18]:
filtered_mobile_RDD = tracks_parsed.filter(lambda track: track.mobile == 0)
filtered_pair_RDD = filtered_mobile_RDD.map(lambda track: (track.customer_id, 1))
mobile_tracks_by_customer = filtered_pair_RDD.reduceByKey(lambda a,b: a+b)
#Printing the customer Id, name and the number of mobile tracks listened by a customer
for mobile_track in mobile_tracks_by_customer.take(10):
  print "Customer Id:", mobile_track[0], "named:", cust_names.value[mobile_track[0]], "has listened to", mobile_track[1], "mobile tracks"

### 2. Total number of mobile tracks listened by a customer using Spark SQL

SparkSQL allows creation of dataframes which have a schema associated with them. After the creation of a dataframe it can be registered as a temporary table. Using the sqlContext sql method, SQL queries can be executed on the temporary table. Spark SQL Catalyst optimizer which optimizes the queries.

Using SparkSQL in our example context
1. Create a dataframe from the `tracks_parsed` RDD using the `createDataFrame` method.
2. Register the dataframe as a temporary table using the `registerTempTable` method.
3. Execute the query using `sqlContext.sql` method, which returns the result as a dataframe.
4. Apply `take(10)` action and using the broadcast variables print the names and number of mobile tracks listened by the customers.

In [20]:
#Creating a dataframe using sqlContext
tracksDF = sqlContext.createDataFrame(tracks_parsed)
#Registering the dataframe as a temporary table
tracksDF.registerTempTable("TracksTable")
#Execute the sql query using the sqlContext object
sql_results = sqlContext.sql("Select customer_id,count(track_id) as mobile_count from TracksTable where mobile=0 group by customer_id order by mobile_count DESC")
#Printing the customer Id, name and the number of mobile tracks listened by a customer
for result in sql_results.take(10):
  print "Customer Id:", result.customer_id, "named:", cust_names.value[result.customer_id], "has listened to", result.mobile_count, "mobile tracks"

### 3. Percentage of tracks listened by the users during different times of the day

Finding the percentage of tracks during different times of day, involves in finding the total number of tracks listened by a user and also the number of tracks listened by the user during different times of the day. Using the `tracks_parsed` named tuple's date_time attribte define a method to determine when in a day a particular track was played. 

1. Apply `map` transformation to the `tracks_parsed` RDD to form a Pair RDD which contains Customer Id and Date Time.
2. Apply `reduceByKey` transformation to form a list of all the Date Time's the user has listened to a track.
3. Implement a custom function, which takes input of all the Date Time's the user has listened to tracks and returns the percent of tracks listened at each time of the day.
4. Using the `take(10)` transformation and use of broadcast variables print the customer name and the percentage of songs listened during each time of the day.

In [22]:
from __future__ import division

def find_percent(numerator,denominator):
  return (round((numerator/denominator)*100, 2))

def compute_stats(user_tracks):
  morning_tracks = afternoon_tracks = evening_tracks = night_tracks = total_tracks = 0
  for track in user_tracks: 
    total_tracks = total_tracks + 1
    # The date time is in the format of 2014-12-01 09:54:09
    date, time = track.split(" ")
    hour_of_day = int(time.split(":")[0])
    if hour_of_day < 5:
      night_tracks = night_tracks + 1
    elif hour_of_day < 12:
      morning_tracks = morning_tracks + 1
    elif hour_of_day < 17:
      afternoon_tracks = afternoon_tracks + 1
    elif hour_of_day < 22:
      evening_tracks = evening_tracks + 1
    else:
      night_tracks = night_tracks + 1
  return (find_percent(morning_tracks,total_tracks), find_percent(afternoon_tracks,total_tracks), find_percent(evening_tracks,total_tracks), find_percent(night_tracks,total_tracks))
  
user_stats = tracks_parsed.map(lambda track: (track.customer_id, [track.date_time])).reduceByKey(lambda x,y:x+y).mapValues(lambda x: compute_stats(x))

for stat in user_stats.take(10):
  print "Customer:", cust_names.value[stat[0]], "has watched", stat[1][0], "% tracks in the morning,", stat[1][1], "% in the afternoon,", stat[1][2], "% in the evening, and", stat[1][3], "% in the night"

### 4. Most popular track and its name

Most popular track refers to the track which has been played the most number of times by all the customers.

1. Map through the `tracks_parsed` RDD and form a Pair RDD of `(track_id, 1)`.
2. Apply the reduceByKey transformation to find the total number of times each track is played.
3. Apply the reduce action, which compares the number of times a track is played and finds the track which is played the most
4. Create a broadcast variable containing the names of the track corresponding to its track ID

#### Creating the Track names broadcast variable

Download the data from the MapR github repo from <a href="https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/music.csv">here</a>. Load it into the tmp folder of your local file system. Define a method which creates broadcast variable for Track names

In [25]:
%sh 
wget -P /tmp "https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/music.csv"

In [26]:
# A method which creates a dictionary which maps Track Id to Track Name 
def create_music_dict():
  music_dict = {}
  with open('/tmp/music.csv') as f:
    for line in f:
      fields = line.split(",")
      if (fields[0] !="TrackId"):
        track_id = int(fields[0])
        title = str(fields[1])
        artist = str(fields[2])
        length = int(fields[3])
        music_dict[track_id] = title
  return music_dict

#Creating the broadcast variable tracks_info
tracks_info = sc.broadcast(create_music_dict())

In [27]:
track_counts = tracks_parsed.map(lambda track: (track.track_id,1)).reduceByKey(lambda a,b: a+b)
most_popular_track = track_counts.reduce(lambda track1, track2: track1 if track1[1]>track2[1] else track1)
print "The most popular track is", tracks_info.value[most_popular_track[0]], "and is played", most_popular_track[1] , "times"