In [1]:
# Scalability: 8 executors, 8 cores. 


from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077") \
        .appName("Group12_project_1")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.initialExecutors",10)\
        .config("spark.dynamicAllocation.minExecutors",8)\
        .config("spark.dynamicAllocation.maxExecutors",8)\
        .config("spark.dynamicAllocation.executorIdleTimeout","60s")\
        .config("spark.executor.cores",8)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [2]:
import h5py
import sys
import io

# h5py doesn't know anything about HDFS, and will be unable to work with the 'magic' HDFS filenames.
# As a workaround, we can load the file into memory with Spark and then read the file.

In [3]:
#rdd = spark_context.binaryFiles("hdfs://host-192-168-1-153-ldsa:9000/millionsongs/data/A/A/A/TRAAAAW128F429D538.h5")
rdd = spark_context.binaryFiles("hdfs://192.168.1.153:9000/millionsongs/data/*/*/*/*")

def f(x):
    
    # x[0] = filename
    # x[1] = binary content
    with h5py.File(io.BytesIO(x[1])) as f:

        artistName        = f['metadata']['songs']['artist_name'][0].decode('UTF-8')
        songTitle         = f['metadata']['songs']['title'][0].decode('UTF-8')
        songHotness       = f['metadata']['songs']['song_hotttnesss'][0].item()
        artistHotness     = f['metadata']['songs']['artist_hotttnesss'][0].item()
        latitude          = f['metadata']['songs']['artist_latitude'][0].item()
        longitude         = f['metadata']['songs']['artist_longitude'][0].item()
        duration          = f['analysis']['songs']['duration'][0].item()
        tempo             = f['analysis']['songs']['tempo'][0].item()
        year              = f['musicbrainz']['songs']['year'][0].item()
        
        return [artistName, songTitle, songHotness, artistHotness, 
                latitude, longitude, duration, tempo, year]

rdd = rdd.map(f)
rdd.take(2)

[['Andy Andy',
  'La Culpa',
  0.0,
  0.3794821702804675,
  nan,
  nan,
  226.35057,
  130.04,
  0],
 ['Faiz Ali Faiz',
  'Sohna Nee Sohna Data',
  nan,
  0.30667609347902947,
  nan,
  nan,
  599.24853,
  99.273,
  0]]

In [4]:
from pyspark.sql.types import *

fields = [StructField("aristName", StringType(), True),
          StructField("songTitle", StringType(), True),
          StructField("songHotness", FloatType(), True),
          StructField("artistHottness", FloatType(), True),
          StructField("latitude", FloatType(), True),
          StructField("longitude", FloatType(), True),
          StructField("duration", FloatType(), True),
          StructField("tempo", FloatType() ,True),
          StructField("year", IntegerType(), True)]

schema = StructType(fields)

songs = spark_session.createDataFrame(rdd, schema)

In [5]:
songs.printSchema()

root
 |-- aristName: string (nullable = true)
 |-- songTitle: string (nullable = true)
 |-- songHotness: float (nullable = true)
 |-- artistHottness: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- duration: float (nullable = true)
 |-- tempo: float (nullable = true)
 |-- year: integer (nullable = true)



In [6]:
songs.show()
#songs.show(vertical = True)

+--------------------+--------------------+-----------+--------------+--------+----------+---------+-------+----+
|           aristName|           songTitle|songHotness|artistHottness|latitude| longitude| duration|  tempo|year|
+--------------------+--------------------+-----------+--------------+--------+----------+---------+-------+----+
|           Andy Andy|            La Culpa|        0.0|    0.37948218|     NaN|       NaN|226.35057| 130.04|   0|
|       Faiz Ali Faiz|Sohna Nee Sohna Data|        NaN|     0.3066761|     NaN|       NaN|599.24854| 99.273|   0|
|        SUE THOMPSON|James (Hold The L...| 0.49529362|    0.30624226|37.83721| -94.35868|124.86485|137.522|1985|
|      Five Bolt Main|Made Like This (L...|  0.3759843|    0.36365137|     NaN|       NaN| 225.0967|164.672|   0|
|          Faye Adams|Crazy Mixed Up World|        0.0|    0.29923037|40.73197| -74.17418|156.39465| 95.957|1961|
|         John Wesley|   The Emperor Falls| 0.37753165|     0.4527895|27.94017| -82.3254

In [7]:

# Size of the dataframe
print("Rows in  Dataframe:", songs.count())

# RDD partitions
print("Number of Partitions:", songs.rdd.getNumPartitions())

Rows in  Dataframe: 10000
Number of Partitions: 21


In [8]:
import time
times = []
start_time = time.time()

df_filtered = songs.filter((songs.latitude != 'NaN') & (songs.songHotness != 'NaN') &
                           (songs.songHotness != 0.0) & (songs.year != 0))
df_filtered.show()

end_time = time.time()
total_time = end_time - start_time
times.append(total_time)
print("Evaluation time: {} seconds".format(total_time))

+--------------------+--------------------+-----------+--------------+---------+----------+---------+-------+----+
|           aristName|           songTitle|songHotness|artistHottness| latitude| longitude| duration|  tempo|year|
+--------------------+--------------------+-----------+--------------+---------+----------+---------+-------+----+
|        SUE THOMPSON|James (Hold The L...| 0.49529362|    0.30624226| 37.83721| -94.35868|124.86485|137.522|1985|
|     Nine Inch Nails|               Metal|  0.6355367|     0.5984585| 41.50471| -81.69074| 426.8926| 63.017|2000|
|        Bob Neuwirth|     Biding Her Time|  0.4806106|    0.30150247| 40.71455| -74.00712|241.10976|152.267|1990|
|       Suzanne Ciani|              Mozart| 0.60850006|    0.38070452| 37.77916|-122.42005|192.20853|139.189|1989|
|           Pearl Jam|          Inside Job|  0.6593047|     0.6079718| 47.60356|-122.32944|428.56445|105.994|2006|
|        Jimmy Wakely|Beautiful Brown eyes| 0.46264318|    0.29153973| 34.31109|

In [9]:
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from ipykernel import kernelapp as app

import time
'''
Each geolocation service you might use, such as Google Maps, Bing Maps, or Nominatim, has its own class in
geopy.geocoders abstracting the service’s API. Geocoders each define at least a geocode method, for resolving
a location from a string, and may define a reverse method, which resolves a pair of coordinates to an address. Each
Geocoder accepts any credentials or settings needed to interact with its service, e.g., an API key or locale, during its
initialization.

May need python3 -m pip install geopy --user
'''

def setCountry(coor):

    
    geolocator = Nominatim(user_agent="Million Song Dataset project")
    geocode = RateLimiter(geolocator.geocode, min_delay_seconds=1.5)
    location = geolocator.reverse((coor[0],coor[1]))
    return location.raw['address']['country']
    
country = setCountry([34.302459716796875, -79.87162017822266])
print(country)

#countries = songs.map(setCountry(songs.latitude, songs.longitude))

#countries.take(5)

USA


In [10]:
#Filter the songs which have a "Hotness" greater than 70%, to determinate the most popular songs.
import time
times = []
start_time = time.time()

selected_songs = df_filtered.filter(df_filtered.songHotness >= 0.7)
selected_songs.show()

end_time = time.time()
total_time = end_time - start_time
times.append(total_time)
print("Evaluation time: {} seconds".format(total_time))

+--------------------+--------------------+-----------+--------------+--------+----------+---------+-------+----+
|           aristName|           songTitle|songHotness|artistHottness|latitude| longitude| duration|  tempo|year|
+--------------------+--------------------+-----------+--------------+--------+----------+---------+-------+----+
|          Lisa Lynne|       Isla del Luna| 0.76130325|    0.42093775|34.05349|-118.24532|316.49915| 95.981|1999|
|         Cancer Bats|            Sabotage|  0.7262674|    0.53384274|43.64856| -79.38533|180.71465|  86.01|2010|
|       Avril Lavigne|           Innocence| 0.92713255|    0.59859675|44.16104|  -77.3819|  232.202|138.364|2007|
|Del Tha Funkee Ho...|       Mistadobalina|  0.7589738|    0.45843947|37.80506|-122.27302| 257.6975| 99.566|1991|
|       Kings Of Leon|              Genius|  0.9102251|    0.78880596|36.16778| -86.77836|168.82893|125.875|2003|
|        Circle Jerks|       Paid Vacation|  0.7795272|     0.4140795|34.05349|-118.2453

In [11]:
#Total of songs form the original dataset which "hotness" is greater than 70%
selected_songs.count()

99

In [12]:
#Classification of singers/band by their amount of popular songs. 

singer=selected_songs.groupBy(selected_songs.aristName).count().orderBy('count',ascending=False).show()

+--------------------+-----+
|           aristName|count|
+--------------------+-----+
|       Kings Of Leon|    3|
|    Public Image Ltd|    3|
|   The White Stripes|    3|
|       Scar Symmetry|    3|
|           Pennywise|    2|
|Arsonists Get All...|    2|
|          Lisa Lynne|    2|
|          Nickelback|    2|
|             OutKast|    2|
|        Circle Jerks|    2|
|         Alicia Keys|    2|
|        Daddy Yankee|    2|
|         The Strokes|    2|
|            The Fray|    2|
|                 M83|    2|
|   Lighthouse Family|    2|
|        Tyrone Wells|    2|
|         Linkin Park|    1|
|                 Tad|    1|
|          Yellowcard|    1|
+--------------------+-----+
only showing top 20 rows



In [13]:
#Statistics about the "Tempo" of the 99 most popular songs. 
selected_songs.describe('tempo').show()

+-------+------------------+
|summary|             tempo|
+-------+------------------+
|  count|                99|
|   mean|127.78080803456933|
| stddev| 29.05244925733072|
|    min|            84.373|
|    max|           201.566|
+-------+------------------+



In [14]:
#Selection of artist/bands who had the most popular songs multiple times (3 times each) 
#Creation of dictionary to do the filter.

singers = ['Scar Symmetry', 'Public Image Ltd', 'The White Stripes','Kings Of Leon']

best_songs=selected_songs.filter(selected_songs.aristName.isin(singers))

best_songs.show()

+-----------------+--------------------+-----------+--------------+--------+---------+---------+-------+----+
|        aristName|           songTitle|songHotness|artistHottness|latitude|longitude| duration|  tempo|year|
+-----------------+--------------------+-----------+--------------+--------+---------+---------+-------+----+
|    Kings Of Leon|              Genius|  0.9102251|    0.78880596|36.16778|-86.77836|168.82893|125.875|2003|
|    Kings Of Leon|       Day Old Blues|  0.7247888|    0.83771473|36.16778|-86.77836|212.97588| 85.855|2004|
|The White Stripes|Im Slowly Turnin...|  0.7076683|     0.6538972|42.33168|-83.04792|274.59872|131.279|2007|
|    Scar Symmetry|     The Illusionist|  0.7163213|    0.45582598|62.19845| 17.55142| 271.5424|150.086|2006|
|    Scar Symmetry|The Three-Dimensi...|  0.7101015|    0.45582598|62.19845| 17.55142|237.84444|110.107|2008|
|    Kings Of Leon|               Ragoo|  0.7461609|    0.78880596|36.16778|-86.77836|181.13261|102.066|2007|
| Public I

In [15]:
#Locations from where the most popular songs were produced
location_of_most_popular_songs=best_songs.select('longitude' , 'latitude')
location_of_most_popular_songs.show()

+---------+--------+
|longitude|latitude|
+---------+--------+
|-86.77836|36.16778|
|-86.77836|36.16778|
|-83.04792|42.33168|
| 17.55142|62.19845|
| 17.55142|62.19845|
|-86.77836|36.16778|
| -0.12714|51.50632|
|-83.04792|42.33168|
| -0.12714|51.50632|
| 17.55142|62.19845|
|-83.04792|42.33168|
| -0.12714|51.50632|
+---------+--------+



In [16]:
#Classification of the locations where the most popular song were produced.
Location=location_of_most_popular_songs.groupBy(best_songs.latitude,best_songs.longitude).count()\
.orderBy('count',ascending=False)

In [17]:
Location.show()

+--------+---------+-----+
|latitude|longitude|count|
+--------+---------+-----+
|42.33168|-83.04792|    3|
|36.16778|-86.77836|    3|
|62.19845| 17.55142|    3|
|51.50632| -0.12714|    3|
+--------+---------+-----+



In [18]:
#Convertion of "coordinates" to input in SetCountry fuction.
from pyspark.sql.functions import array
location_list=Location.withColumn("Coordinates", array(Location.latitude, Location.longitude))
Country_list=location_list.select("Coordinates").rdd.flatMap(lambda x: x).collect()



In [19]:
print(Country_list)

[[51.50632095336914, -0.12714000046253204], [36.167781829833984, -86.77835845947266], [62.198448181152344, 17.551420211791992], [42.33168029785156, -83.04792022705078]]


In [20]:
#Use of the SetCountry function to determinate Country by coordinates. 
    
def countries(Countries):
        list_con=[]
        lista=[]
        for i in range(0,len(Countries)):
         countries_list= setCountry(Countries[i])
         list_con.append(countries_list)
         lista.append([list_con[i], Countries[i]])         
        return lista
    
countries=(countries(Country_list))
print(countries)


[['UK', [51.50632095336914, -0.12714000046253204]], ['USA', [36.167781829833984, -86.77835845947266]], ['Sverige', [62.198448181152344, 17.551420211791992]], ['USA', [42.33168029785156, -83.04792022705078]]]


In [21]:
#Countries with the most popular songs : USA, UK, Sweden. 


In [22]:
#Evaluation of Global Location vs Tempo ( obtaining max, min and avg at each location)
times = []
start_time = time.time()

from pyspark.sql import functions as F

Global_location=df_filtered.groupBy(df_filtered.latitude,df_filtered.longitude)\
.agg(F.min(df_filtered.tempo),F.max(df_filtered.tempo),F.avg(df_filtered.tempo))

Global_location=Global_location.withColumn("Coordinates", array(Global_location.latitude, Global_location.longitude))

Global_location.show()

end_time = time.time()
total_time = end_time - start_time
times.append(total_time)
print("Evaluation time: {} seconds".format(total_time))

+---------+---------+----------+----------+------------------+--------------------+
| latitude|longitude|min(tempo)|max(tempo)|        avg(tempo)|         Coordinates|
+---------+---------+----------+----------+------------------+--------------------+
| 30.08374| 31.25536|    88.928|    128.47|108.69900131225586|[30.08374, 31.25536]|
| 43.07295|-89.38669|    91.498|    97.265| 94.88166809082031|[43.07295, -89.38...|
|  33.7621|-90.30507|   114.255|   114.255|114.25499725341797|[33.7621, -90.30507]|
| 42.32807| -83.7336|   141.002|   141.002| 141.0019989013672|[42.32807, -83.7336]|
| 40.85251|-73.13585|    95.254|   147.745|124.67233276367188|[40.85251, -73.13...|
| 54.48303| -3.53444|   101.526|   124.175|109.12133534749348|[54.48303, -3.53444]|
|  -13.442| -41.9952|    91.879|   150.155|121.01699829101562| [-13.442, -41.9952]|
| 33.95319|-84.54588|    90.291|    90.291| 90.29100036621094|[33.95319, -84.54...|
| 44.16104| -77.3819|   138.364|   138.364|138.36399841308594|[44.16104, -77

In [23]:
#List of locations to use GeoPy
Global_list=Global_location.select("Coordinates").rdd.flatMap(lambda x: x).collect()



In [24]:
#Number of diffetent locations.
len(Global_list)

315

In [25]:
# I had to split the list becase GEOPY has a restrition on timerequest also a problem with tuplet #119?? Don't know the reason. 
Global_list_1=(Global_list[0:100])
Global_list_21=(Global_list[101:118])
Global_list_22=(Global_list[120:200])
Global_list_3=(Global_list[201:315])


In [26]:

def countries(Countries):
        list_con=[]
        lista=[]
        for i in range(0,len(Countries)):
         countries_list= setCountry(Countries[i])
         list_con.append(countries_list)
         lista.append((list_con[i],Countries[i]))         
        return lista
    
    
Global_countries_1=(countries(Global_list_1))





In [27]:
Global_countries_22=(countries(Global_list_22))


In [28]:
Global_countries_21=(countries(Global_list_21))

In [30]:
Global_countries_3=(countries(Global_list_3))

In [31]:
#Merge the list of countries, this was necessary because the request restriction of GeoPy.
Countries_list = (*Global_countries_1,*Global_countries_21, *Global_countries_22,*Global_countries_3)

In [32]:
#Convert the list to an array, which will be used as a dictionary to determinate the locations in the dataframe.
import numpy as np
myarray = np.asarray(Countries_list)

In [33]:
#Determinating the locations to the dataframe. 
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

times = []
start_time = time.time()

Country_key=udf(lambda value:next((x[0] for x in myarray if x[1] == value ), None),StringType())
        

new_location_data_frame = Global_location.withColumn('Country',Country_key(Global_location.Coordinates))
new_location_data_frame.show()

end_time = time.time()
total_time = end_time - start_time
times.append(total_time)
print("Evaluation time: {} seconds".format(total_time))

+---------+---------+----------+----------+------------------+--------------------+---------+
| latitude|longitude|min(tempo)|max(tempo)|        avg(tempo)|         Coordinates|  Country|
+---------+---------+----------+----------+------------------+--------------------+---------+
| 30.08374| 31.25536|    88.928|    128.47|108.69900131225586|[30.08374, 31.25536]|      مصر|
| 43.07295|-89.38669|    91.498|    97.265| 94.88166809082031|[43.07295, -89.38...|      USA|
|  33.7621|-90.30507|   114.255|   114.255|114.25499725341797|[33.7621, -90.30507]|      USA|
| 42.32807| -83.7336|   141.002|   141.002| 141.0019989013672|[42.32807, -83.7336]|      USA|
| 40.85251|-73.13585|    95.254|   147.745|124.67233276367188|[40.85251, -73.13...|      USA|
| 54.48303| -3.53444|   101.526|   124.175|109.12133534749348|[54.48303, -3.53444]|       UK|
|  -13.442| -41.9952|    91.879|   150.155|121.01699829101562| [-13.442, -41.9952]|   Brasil|
| 33.95319|-84.54588|    90.291|    90.291| 90.2910003662109

In [34]:
print("Evaluation time: {} seconds".format(total_time))

Evaluation time: 41.55136752128601 seconds


In [35]:
#Grouping the information by Country.

Total_countries=new_location_data_frame.groupBy(new_location_data_frame.Country).count().orderBy('count',ascending=False)

In [36]:
Total_countries.count()

39

In [37]:
#Getting the tempo values for each country.
Tempo_per_country=new_location_data_frame.groupBy(new_location_data_frame.Country)\
.agg(F.min('min(tempo)'),F.max('max(tempo)'),F.avg('avg(tempo)'))

In [38]:
#Final result.
#The null probably is the #119, 

Tempo_per_country.show(39)

+--------------------+---------------+---------------+------------------+
|             Country|min(min(tempo))|max(max(tempo))|   avg(avg(tempo))|
+--------------------+---------------+---------------+------------------+
|             Danmark|        109.984|        112.222|111.10300064086914|
|              Србија|        156.948|        156.948|  156.947998046875|
|United States of ...|        127.601|        127.601|127.60099792480469|
|               Norge|         95.138|        164.595|118.40124893188477|
|              Italia|         69.768|        160.071|110.65499877929688|
|              España|         90.645|         180.04|126.22749710083008|
|               Suomi|         71.758|        165.908|128.41321023305258|
|              France|         76.296|        241.892|118.44690651363797|
|Schweiz/Suisse/Sv...|        142.192|        142.192|142.19200134277344|
|                null|          72.74|        144.538|117.48724937438965|
|           Argentina|            0.0|