In [1]:
import pyspark
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (IntegerType, 
                               StringType, 
                               StructType, 
                               StructField)

from tqdm.notebook import tqdm

conf = SparkConf()

In [2]:
# Kubernetes is a Spark master in my setup. 
# It creates pods with Spark workers, orchestrates those 
# workers and returns final results to the Spark driver 
# (“k8s://https://” is NOT a typo, this is how Spark knows the “provider” type). 
conf.setAppName("Last Descriptive Analysis").setMaster("k8s://https://192.168.4.60:6443") 

# Worker pods are created from the base Spark docker image.
# If you use another image, specify its name instead.

conf.set(
    "spark.kubernetes.container.image", 
    "johnb340/spark-executor:v1alpha") 


conf.set(
    "spark.kubernetes.container.image.pullPolicy", 
    "IfNotPresent") 

# Authentication certificate and token (required to create worker pods):
conf.set(
    "spark.kubernetes.authenticate.caCertFile", 
    "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
conf.set(
    "spark.kubernetes.authenticate.oauthTokenFile", 
    "/var/run/secrets/kubernetes.io/serviceaccount/token")

# Service account which should be used for the driver
conf.set(
    "spark.kubernetes.authenticate.driver.serviceAccountName", 
    "spark-minion") 

# 2 pods/workers will be created. Can be expanded for larger workloads.
conf.set("spark.executor.instances", "5") 

# The DNS alias for the Spark driver. Required by executors to report status.
conf.set( "spark.driver.host", "jupyter-spark-driver.default")

conf.set("spark.pyspark.driver.python","jupyter")

conf.set("spark.pyspark.driver.python.opts","notebook")

conf.set("spark.kubernetes.namespace","default")

conf.set("spark.driver.port", "20020")

# Initialize spark context, create executors
#sc = pyspark.SparkContext(conf=conf)


sc = pyspark.SparkContext(conf=conf)

23/02/24 02:41:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark = SparkSession(sc)

In [4]:
path = "usersha1-artmbid-artname-plays.tsv"

In [5]:
df = pd.read_csv(path, sep="\t", nrows=700000)

In [6]:
df_schema = StructType([StructField("user", StringType(), True),
                       StructField("auid", StringType(), True),
                       StructField("artist_name", StringType(), True),
                       StructField("listens", IntegerType(), True)])

In [7]:
df = spark.createDataFrame(df, df_schema)

In [8]:
plays = df.rdd

In [9]:
plays.take(5)

23/02/24 02:41:18 WARN TaskSetManager: Stage 0 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[Row(user='00000c289a1829a808ac09c00daf10bc3c4e223b', auid='f2fb0ff0-5679-42ec-a55c-15109ce6e320', artist_name='die Ärzte', listens=1099),
 Row(user='00000c289a1829a808ac09c00daf10bc3c4e223b', auid='b3ae82c2-e60b-4551-a76d-6620f1b456aa', artist_name='melissa etheridge', listens=897),
 Row(user='00000c289a1829a808ac09c00daf10bc3c4e223b', auid='3d6bbeb7-f90e-4d10-b440-e153c0d10b53', artist_name='elvenking', listens=717),
 Row(user='00000c289a1829a808ac09c00daf10bc3c4e223b', auid='bbd2ffd7-17f4-4506-8572-c1ea58c3f9a8', artist_name='juliette & the licks', listens=706),
 Row(user='00000c289a1829a808ac09c00daf10bc3c4e223b', auid='8bfac288-ccc5-448d-9573-c33ea2aa5c30', artist_name='red hot chili peppers', listens=691)]

In [10]:
type(plays)

pyspark.rdd.RDD

In [11]:
#we only want userid, artist_id  and number of plays; these are found in the following columns
ratings_data_pertinent = plays.map(lambda x: (x[0], x[2],(x[3])))

In [12]:
###BEGIN DATA Munging -- 

In [13]:
# Let's make a numerical id out of the encrypted user_id feature:
#Note: it may be easier to just assign a new id, rather than using the encrypted user_id field.
#However, this project is to demonstrate data cleaning as well as model development.
#Furthermore, what if assigning a new id is  not desirable in a situation?
#For better or worse, perhaps we will be creating a user_id we can use, the "hard way,"
#The approache we take for the artist_id field could certainly be used and would be easier. 
#But, easier does not always equal better.

In [14]:
# Perhaps regex would be more efficient and easier, but I find dictionaries extremely useful when I need to replace words or
# or remove words. It seems regex can  be more "large-handed" and clumsy. Dictionaries are very precise.
#Im defining a  method here to replace the letters in the artist MD5-SHA with numbers:
import string

def numerizer(word):
    translation = str.maketrans("","", string.punctuation)
    new_word = word.translate(translation)
    new_word_lower = new_word.lower()
    return new_word_lower.strip(" ")

def md5_replace(word,_dict):
    i = list(_dict.keys())
    j = list(_dict.values())
    k = len(j)-1
    _word = numerizer(word)
    while k >= 0:
        _word1 = str.replace(_word, i[k], j[k])
        _word =_word1  
        k-= 1
    return _word1


In [15]:
alpha_dict = {"a":"1", "b":"2","c":"3","d":"4", "e":"5","f":"6","g":"7",
              "h":"8","i":"9","j":"10","k":"11","l":"12","m":"13","n":"14","o":"15",
                  "p":"16","q":"17","r":"18", "s":"19","t":"20","u":"21","v":"22","w":"23","x":"24",
                  "y":"25","z":"26"," ":"0", "ä":"27","ö":"28"}

In [16]:
# Get rid of leading zeroes which will be problematic and apply alpha character function
numeric_uid = ratings_data_pertinent.map(lambda x: md5_replace(x[0], alpha_dict)).map(lambda line: line.lstrip("0"))

In [17]:
#We can see below that we have a numeric uid now. This user has the first 49 entries. However, we need to cut down 
# The size of this number; alot!
numeric_uid.take(5)

23/02/24 02:41:25 WARN TaskSetManager: Stage 1 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

['32891182918081309300416102333452232',
 '32891182918081309300416102333452232',
 '32891182918081309300416102333452232',
 '32891182918081309300416102333452232',
 '32891182918081309300416102333452232']

In [18]:
def shrinker(num):
    places = len(num)-9
 
    if places > 0:
        zeroes = places 
      
        cutter = "1"
        for x in range(zeroes):
             cutter+=str(0)
        return str(round(int(num)/float(cutter)))

    else:
        return num
    

In [19]:
numeric_uid = numeric_uid.map(lambda x : shrinker(x))

In [20]:
numeric_uid.take(10)

23/02/24 02:41:30 WARN TaskSetManager: Stage 2 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

['328911829',
 '328911829',
 '328911829',
 '328911829',
 '328911829',
 '328911829',
 '328911829',
 '328911829',
 '328911829',
 '328911829']

In [21]:
#we need to keep track of index
numeric_uid_ = numeric_uid.zipWithIndex()

23/02/24 02:41:34 WARN TaskSetManager: Stage 3 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [22]:
numeric_uid_.take(5)

23/02/24 02:41:44 WARN TaskSetManager: Stage 4 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[('328911829', 0),
 ('328911829', 1),
 ('328911829', 2),
 ('328911829', 3),
 ('328911829', 4)]

In [23]:
numeric_uid_ = numeric_uid_.map(lambda x : (x[1],x[0]))

In [24]:
numeric_uid_.take(5)

23/02/24 02:41:46 WARN TaskSetManager: Stage 5 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[(0, '328911829'),
 (1, '328911829'),
 (2, '328911829'),
 (3, '328911829'),
 (4, '328911829')]

In [25]:
#To get all of the artists we will use groupby. Then we can assign a number to each of the unique artists
artist_name = ratings_data_pertinent.map(lambda x: (x[1])).distinct()

In [26]:
artist_name.top(5)

23/02/24 02:41:49 WARN TaskSetManager: Stage 6 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

['ｔhe gazette', 'ｃｈａｒｉｏｔｓ', 'ｂｏｏｇｉｅｍａｎ', 'ａｔｔｉｃ', 'ＹＵＩ']

In [27]:
#Make sure we still have all the artists
artist_name.count()

67554

In [28]:
#Now we can add a unique id for each artist name
artist_index=sc.parallelize(range(1,67554))

In [29]:
artist_index = artist_index.zipWithIndex()
artist_name = artist_name.zipWithIndex()

In [30]:
artist_index = artist_index.map(lambda x: (x[1],x[0]))
artist_name = artist_name.map(lambda x: (x[1],x[0]))

In [31]:
artist_name.take(5)

[(0, 'the clash'),
 (1, 'lao che'),
 (2, 'julian cope'),
 (3, 'eläkeläiset'),
 (4, 'mamady keita')]

In [32]:
#numeric_aid_indexed = numeric_aid_index.zip(numeric_aid_)
aid_indexed = artist_index.join(artist_name)

In [33]:
aid_indexed.top(5)

[(67552, (67553, 'bloke')),
 (67551, (67552, 'dolores o´riordan')),
 (67550, (67551, 'acroma')),
 (67549, (67550, 'marcel türkowsky')),
 (67548, (67549, 'e*rock'))]

In [34]:
aid_indexed = aid_indexed.map(lambda x: (x[1][1],x[0]))

In [35]:
aid_indexed.top(5)

[('ｔhe gazette', 21103),
 ('ｃｈａｒｉｏｔｓ', 50645),
 ('ｂｏｏｇｉｅｍａｎ', 37987),
 ('ａｔｔｉｃ', 62845),
 ('ＹＵＩ', 53211)]

In [36]:
artist_titles = ratings_data_pertinent.map(lambda x:(x[1]))

In [37]:
artist_titles.take(5)

23/02/24 02:42:00 WARN TaskSetManager: Stage 21 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

['die Ärzte',
 'melissa etheridge',
 'elvenking',
 'juliette & the licks',
 'red hot chili peppers']

In [38]:
#To use the index as a sort of lookup table, we need to estabilish a common index
artists_ = artist_titles.keyBy(lambda r : r)
index_ = aid_indexed.keyBy(lambda r: r[0])

In [39]:
artists_.take(5)

23/02/24 02:42:02 WARN TaskSetManager: Stage 22 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[('die Ärzte', 'die Ärzte'),
 ('melissa etheridge', 'melissa etheridge'),
 ('elvenking', 'elvenking'),
 ('juliette & the licks', 'juliette & the licks'),
 ('red hot chili peppers', 'red hot chili peppers')]

In [40]:
index_.take(5)

[('detektivbyrån', ('detektivbyrån', 54050)),
 ('matt & kim', ('matt & kim', 54060)),
 ('the residents', ('the residents', 54070)),
 ('cheek', ('cheek', 54080)),
 ('my chemical romance', ('my chemical romance', 54090))]

In [41]:
artists_index = artists_.join(index_)

In [42]:
#Heres our artists index
artists_index.take(5)

23/02/24 02:42:04 WARN TaskSetManager: Stage 28 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[('miles davis', ('miles davis', ('miles davis', 0))),
 ('miles davis', ('miles davis', ('miles davis', 0))),
 ('miles davis', ('miles davis', ('miles davis', 0))),
 ('miles davis', ('miles davis', ('miles davis', 0))),
 ('miles davis', ('miles davis', ('miles davis', 0)))]

In [43]:
# Take just the tuple we need
artist_index = artists_index.map(lambda x: (x[1][1]))

In [44]:
artist_index.take(5)

[('rotting christ', 195),
 ('rotting christ', 195),
 ('rotting christ', 195),
 ('rotting christ', 195),
 ('rotting christ', 195)]

In [45]:
ratings_data = ratings_data_pertinent.map(lambda x: (x[2]))

In [46]:
ratings_data.take(5)

23/02/24 02:42:12 WARN TaskSetManager: Stage 34 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[1099, 897, 717, 706, 691]

In [47]:
artist_names = artist_index.map(lambda x: (x[0]))
artist_id = artist_index.map(lambda x: (x[1]))

In [48]:
ratings = ratings_data.zipWithIndex()
artist_index_ = artist_index.zipWithIndex()
uids = numeric_uid.zipWithIndex()

23/02/24 02:42:14 WARN TaskSetManager: Stage 35 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
23/02/24 02:42:21 WARN TaskSetManager: Stage 40 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [49]:
uids.take(5)

23/02/24 02:42:29 WARN TaskSetManager: Stage 41 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[('328911829', 0),
 ('328911829', 1),
 ('328911829', 2),
 ('328911829', 3),
 ('328911829', 4)]

In [50]:
artist_index_.take(5)

[(('the clash', 4), 0),
 (('the clash', 4), 1),
 (('the clash', 4), 2),
 (('the clash', 4), 3),
 (('the clash', 4), 4)]

In [51]:
artist_index_ = artist_index_.map(lambda x: (x[1],x[0]))
uids = uids.map(lambda x: (x[1],x[0]))
ratings = ratings.map(lambda x: (x[1],x[0]))

In [52]:
ratings.take(5)

23/02/24 02:42:32 WARN TaskSetManager: Stage 46 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[(0, 1099), (1, 897), (2, 717), (3, 706), (4, 691)]

In [53]:
RDD = uids.join(artist_index_).join(ratings)

In [54]:
RDD.take(5)

23/02/24 02:42:34 WARN TaskSetManager: Stage 50 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
23/02/24 02:42:44 WARN TaskSetManager: Stage 51 contains a task of very large size (7825 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[(560225, (('825449122', ('o.n.a.', 29173)), 115)),
 (560325, (('825589403', ('the view', 27221)), 584)),
 (560425, (('826304432', ('taking back sunday', 27347)), 195)),
 (560525, (('826668727', ('taking back sunday', 27347)), 304)),
 (560625, (('830040544', ('taking back sunday', 27347)), 58))]

In [55]:
def split_tuple(data):
    v = data[0]
    w = data[1][0][0]
    x = data[1][0][1][1]
    y = data[1][0][1][0]
    z = data[1][1]
    return v, w, x , y, z

In [56]:
_RDD = RDD.map(lambda x: split_tuple(x))

In [57]:
_RDD.take(2)

[(279650, '421213018', 13594, 'slayer', 23),
 (279750, '421739456', 13518, 'anthrax', 69)]

In [58]:
rdd = _RDD.collect()


                                                                                

In [59]:
with tqdm(total=len(rdd)) as pbar:
    with open("finalRDD.csv", "w") as file:
        
        for r in rdd:

            file.write(f"{r[0]}, {r[1]}, {r[2]}, '{r[3]}'\n")
        
            pbar.update(1)


  0%|          | 0/699999 [00:00<?, ?it/s]