# **Installing Dependencies**

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=783839b885525d973b8aaf1023f729267706009b27c7aa9149192ca2b1a24739
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max, struct
import matplotlib.pyplot as plt
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [None]:
# Create spark session
spark = SparkSession.builder.appName('bigdata_project').getOrCreate

In [None]:
# Create shortcut term
sc = SparkContext.getOrCreate()

# label SQL Context
sqlContext = SQLContext(sc)



# **Loading data files into different data frames**

In [None]:
# Load data from csv to a dataframe.
# header=True means the first row is a header
# sep=';' means the column are seperated using ''
df_artists = sqlContext.read.csv('artists.dat', header=True, sep="\t")
df_artists.show(10)

+---+-----------------+--------------------+--------------------+
| id|             name|                 url|          pictureURL|
+---+-----------------+--------------------+--------------------+
|  1|     MALICE MIZER|http://www.last.f...|http://userserve-...|
|  2|  Diary of Dreams|http://www.last.f...|http://userserve-...|
|  3|Carpathian Forest|http://www.last.f...|http://userserve-...|
|  4|     Moi dix Mois|http://www.last.f...|http://userserve-...|
|  5|      Bella Morte|http://www.last.f...|http://userserve-...|
|  6|        Moonspell|http://www.last.f...|http://userserve-...|
|  7|   Marilyn Manson|http://www.last.f...|http://userserve-...|
|  8|      DIR EN GREY|http://www.last.f...|http://userserve-...|
|  9|      Combichrist|http://www.last.f...|http://userserve-...|
| 10|          Grendel|http://www.last.f...|http://userserve-...|
+---+-----------------+--------------------+--------------------+
only showing top 10 rows



In [None]:
df_tags= sqlContext.read.csv('tags.dat', header=True, sep="\t")
df_tags.show(20)

+-----+------------------+
|tagID|          tagValue|
+-----+------------------+
|    1|             metal|
|    2| alternative metal|
|    3|         goth rock|
|    4|       black metal|
|    5|       death metal|
|    6|  industrial metal|
|    7|      gothic metal|
|    8|        terror ebm|
|    9|electro-industrial|
|   10|         harsh ebm|
|   11|  post black metal|
|   12|         aggrotech|
|   13|          chillout|
|   14|           ambient|
|   15|         downtempo|
|   16|          new wave|
|   17|         synth pop|
|   18|        electronic|
|   19|              80's|
|   20|            lounge|
+-----+------------------+
only showing top 20 rows



In [None]:
df_user_artists= sqlContext.read.csv('user_artists.dat', header=True, sep="\t")
df_user_artists.show(10)

+------+--------+------+
|userID|artistID|weight|
+------+--------+------+
|     2|      51| 13883|
|     2|      52| 11690|
|     2|      53| 11351|
|     2|      54| 10300|
|     2|      55|  8983|
|     2|      56|  6152|
|     2|      57|  5955|
|     2|      58|  4616|
|     2|      59|  4337|
|     2|      60|  4147|
+------+--------+------+
only showing top 10 rows



In [None]:
df_user_friends= sqlContext.read.csv('user_friends.dat', header=True, sep="\t")
df_user_friends.show(10)

+------+--------+
|userID|friendID|
+------+--------+
|     2|     275|
|     2|     428|
|     2|     515|
|     2|     761|
|     2|     831|
|     2|     909|
|     2|    1209|
|     2|    1210|
|     2|    1230|
|     2|    1327|
+------+--------+
only showing top 10 rows



In [None]:
df_user_taggedartists= sqlContext.read.csv('user_taggedartists.dat', header=True, sep="\t")
df_user_taggedartists.show(10)

+------+--------+-----+---+-----+----+
|userID|artistID|tagID|day|month|year|
+------+--------+-----+---+-----+----+
|     2|      52|   13|  1|    4|2009|
|     2|      52|   15|  1|    4|2009|
|     2|      52|   18|  1|    4|2009|
|     2|      52|   21|  1|    4|2009|
|     2|      52|   41|  1|    4|2009|
|     2|      63|   13|  1|    4|2009|
|     2|      63|   14|  1|    4|2009|
|     2|      63|   23|  1|    4|2009|
|     2|      63|   40|  1|    4|2009|
|     2|      73|   13|  1|    4|2009|
+------+--------+-----+---+-----+----+
only showing top 10 rows



In [None]:
df_user_taggedartiststime= sqlContext.read.csv('user_taggedartists-timestamps.dat', header=True, sep="\t")
df_user_taggedartiststime.show(10)

+------+--------+-----+-------------+
|userID|artistID|tagID|    timestamp|
+------+--------+-----+-------------+
|     2|      52|   13|1238536800000|
|     2|      52|   15|1238536800000|
|     2|      52|   18|1238536800000|
|     2|      52|   21|1238536800000|
|     2|      52|   41|1238536800000|
|     2|      63|   13|1238536800000|
|     2|      63|   14|1238536800000|
|     2|      63|   23|1238536800000|
|     2|      63|   40|1238536800000|
|     2|      73|   13|1238536800000|
+------+--------+-----+-------------+
only showing top 10 rows



# **1. Return artists in the top ten of received tags in the last 5 years**

In [None]:
df_user_taggedartists.show()

+------+--------+-----+---+-----+----+
|userID|artistID|tagID|day|month|year|
+------+--------+-----+---+-----+----+
|     2|      52|   13|  1|    4|2009|
|     2|      52|   15|  1|    4|2009|
|     2|      52|   18|  1|    4|2009|
|     2|      52|   21|  1|    4|2009|
|     2|      52|   41|  1|    4|2009|
|     2|      63|   13|  1|    4|2009|
|     2|      63|   14|  1|    4|2009|
|     2|      63|   23|  1|    4|2009|
|     2|      63|   40|  1|    4|2009|
|     2|      73|   13|  1|    4|2009|
|     2|      73|   14|  1|    4|2009|
|     2|      73|   15|  1|    4|2009|
|     2|      73|   18|  1|    4|2009|
|     2|      73|   20|  1|    4|2009|
|     2|      73|   21|  1|    4|2009|
|     2|      73|   22|  1|    4|2009|
|     2|      73|   26|  1|    4|2009|
|     2|      94|   13|  1|    4|2009|
|     2|      94|   15|  1|    4|2009|
|     2|      94|   20|  1|    4|2009|
+------+--------+-----+---+-----+----+
only showing top 20 rows



In [None]:
#Checking what are the last 5 years
years=df_user_taggedartists.select("year").distinct().sort("year")

In [None]:
years.show()

+----+
|year|
+----+
|1956|
|1957|
|1979|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
+----+



In [None]:
query=df_user_taggedartists.filter((df_user_taggedartists.year == "2007") | (df_user_taggedartists.year == "2008") | (df_user_taggedartists.year == "2009") | (df_user_taggedartists.year == "2010") | (df_user_taggedartists.year == "2011") )

In [None]:
query.show()

+------+--------+-----+---+-----+----+
|userID|artistID|tagID|day|month|year|
+------+--------+-----+---+-----+----+
|     2|      52|   13|  1|    4|2009|
|     2|      52|   15|  1|    4|2009|
|     2|      52|   18|  1|    4|2009|
|     2|      52|   21|  1|    4|2009|
|     2|      52|   41|  1|    4|2009|
|     2|      63|   13|  1|    4|2009|
|     2|      63|   14|  1|    4|2009|
|     2|      63|   23|  1|    4|2009|
|     2|      63|   40|  1|    4|2009|
|     2|      73|   13|  1|    4|2009|
|     2|      73|   14|  1|    4|2009|
|     2|      73|   15|  1|    4|2009|
|     2|      73|   18|  1|    4|2009|
|     2|      73|   20|  1|    4|2009|
|     2|      73|   21|  1|    4|2009|
|     2|      73|   22|  1|    4|2009|
|     2|      73|   26|  1|    4|2009|
|     2|      94|   13|  1|    4|2009|
|     2|      94|   15|  1|    4|2009|
|     2|      94|   20|  1|    4|2009|
+------+--------+-----+---+-----+----+
only showing top 20 rows



In [None]:
# Group the data by artistID and count the number of tags received
query1 = query.groupBy("artistID").agg(count("tagID").alias("tag_count"))



In [None]:
query1.show()

+--------+---------+
|artistID|tag_count|
+--------+---------+
|     467|       52|
|     691|      114|
|    7711|        5|
|     675|        3|
|    1090|      233|
|    1512|       68|
|    2136|       53|
|    5925|       45|
|    3606|        6|
|   15269|       15|
|    6194|        9|
|   11332|        5|
|    1572|       11|
|   11722|       13|
|     296|       93|
|    6240|        8|
|    3959|        2|
|    9030|        4|
|     829|        9|
|    1159|       26|
+--------+---------+
only showing top 20 rows



In [None]:
# Rank the artists based on the tag count in descending order
query1 = query1.orderBy(desc("tag_count"))

# Get the top ten artists by selecting the first ten rows
top_ten_df = query1.limit(10)

# Show the results
top_ten_df.show()


+--------+---------+
|artistID|tag_count|
+--------+---------+
|     289|      886|
|      89|      767|
|     292|      744|
|      67|      681|
|      72|      662|
|     190|      568|
|     227|      545|
|     154|      544|
|     288|      522|
|     498|      463|
+--------+---------+



# **2. Returning, for each artist, the users listening to her, sorted by the listening count**

In [None]:
df_user_artists.show()

+------+--------+------+
|userID|artistID|weight|
+------+--------+------+
|     2|      51| 13883|
|     2|      52| 11690|
|     2|      53| 11351|
|     2|      54| 10300|
|     2|      55|  8983|
|     2|      56|  6152|
|     2|      57|  5955|
|     2|      58|  4616|
|     2|      59|  4337|
|     2|      60|  4147|
|     2|      61|  3923|
|     2|      62|  3782|
|     2|      63|  3735|
|     2|      64|  3644|
|     2|      65|  3579|
|     2|      66|  3312|
|     2|      67|  3301|
|     2|      68|  2927|
|     2|      69|  2720|
|     2|      70|  2686|
+------+--------+------+
only showing top 20 rows



In [None]:
df_user_artists.printSchema()

root
 |-- userID: string (nullable = true)
 |-- artistID: string (nullable = true)
 |-- weight: string (nullable = true)



In [None]:
from pyspark.sql.functions import sum, expr

df = df_user_artists.withColumn("weight", expr("cast(weight as int)"))

In [None]:
# Group the data by artistID and userID, and compute the sum of weight for each group
group = df.groupBy("artistID", "userID").agg(sum("weight").alias("total_listening_count"))

# Sort the data in descending order by total listening count
sort = group.orderBy("total_listening_count", ascending=False)

# Show the results
sort.show()

+--------+------+---------------------+
|artistID|userID|total_listening_count|
+--------+------+---------------------+
|      72|  1642|               352698|
|     792|  2071|               324663|
|     511|  1094|               320725|
|     203|  1905|               257978|
|     498|  1664|               227829|
|     378|  1146|               203165|
|     292|   514|               176133|
|     701|  1983|               172496|
|     701|   757|               169596|
|     679|  1086|               165902|
|     503|   903|               146411|
|     687|   946|               144559|
|     289|   542|               131733|
|     378|   938|               129595|
|     289|  2031|               128654|
|     486|  1368|               125471|
|     163|   292|               123065|
|      89|  1135|               114672|
|     187|   442|               112009|
|     198|   459|               110185|
+--------+------+---------------------+
only showing top 20 rows



### **Displaying names of artists as well.**

In [None]:
# Join the tables based on artistID
join = df.join(df_artists, df.artistID == df_artists.id)

# Group the data by artistID, name, and userID, and compute the sum of weight for each group
group = join.groupBy("artistID", "name", "userID").agg(sum("weight").alias("total_listening_count"))

# Sort the data in descending order by total listening count
sort= group.orderBy("total_listening_count", ascending=False)

# Show the results
sort.show()

+--------+------------------+------+---------------------+
|artistID|              name|userID|total_listening_count|
+--------+------------------+------+---------------------+
|      72|      Depeche Mode|  1642|               352698|
|     792|            Thalía|  2071|               324663|
|     511|                U2|  1094|               320725|
|     203|              Blur|  1905|               257978|
|     498|          Paramore|  1664|               227829|
|     378|       Evanescence|  1146|               203165|
|     292|Christina Aguilera|   514|               176133|
|     701|           Shakira|  1983|               172496|
|     701|           Shakira|   757|               169596|
|     679|         Glee Cast|  1086|               165902|
|     503|         In Flames|   903|               146411|
|     687|      All Time Low|   946|               144559|
|     289|    Britney Spears|   542|               131733|
|     378|       Evanescence|   938|               12959

In [None]:
sort.count()

92834

# **3. All users, sorted by the number of their friends.**

In [None]:
df_user_friends.show()

+------+--------+
|userID|friendID|
+------+--------+
|     2|     275|
|     2|     428|
|     2|     515|
|     2|     761|
|     2|     831|
|     2|     909|
|     2|    1209|
|     2|    1210|
|     2|    1230|
|     2|    1327|
|     2|    1585|
|     2|    1625|
|     2|    1869|
|     3|      78|
|     3|     255|
|     3|     460|
|     3|     837|
|     3|    1740|
|     3|    1801|
|     3|    1975|
+------+--------+
only showing top 20 rows



In [None]:
friends=df_user_friends.groupBy('userID').count().sort("count")

In [None]:
friends.show(15)

+------+-----+
|userID|count|
+------+-----+
|    29|    1|
|  1814|    1|
|   356|    1|
|  1957|    1|
|  1692|    1|
|   979|    1|
|  1729|    1|
|  2079|    1|
|   406|    1|
|   672|    1|
|  1012|    1|
|  1518|    1|
|    34|    1|
|   786|    1|
|  1977|    1|
+------+-----+
only showing top 15 rows



In [None]:
friends.count()

1892

###  **Users with most friends**

In [None]:
friends.orderBy("count",ascending=False).show()

+------+-----+
|userID|count|
+------+-----+
|  1543|  119|
|  1281|  110|
|   831|  106|
|   179|   97|
|  1503|   95|
|  1023|   91|
|   405|   90|
|  1895|   90|
|  1300|   89|
|   390|   88|
|   232|   87|
|  1258|   86|
|  1568|   85|
|    46|   84|
|   851|   84|
|   749|   82|
|  1247|   82|
|    78|   81|
|   236|   81|
|  1478|   80|
+------+-----+
only showing top 20 rows



### **Users with least friends**

In [None]:
friends.orderBy("count",ascending=True).show()

+------+-----+
|userID|count|
+------+-----+
|  1692|    1|
|   556|    1|
|  1729|    1|
|   613|    1|
|   406|    1|
|   282|    1|
|  1012|    1|
|  2079|    1|
|    34|    1|
|   672|    1|
|   652|    1|
|  1518|    1|
|   786|    1|
|   979|    1|
|   737|    1|
|  1747|    1|
|  1764|    1|
|  1957|    1|
|  1977|    1|
|   356|    1|
+------+-----+
only showing top 20 rows



# **4. Special Query**

## **Return top three most popular tags among users based on number of tag occurance**

In [None]:

# Join the tables based on tagID
joined_df = df_user_taggedartists.join(df_tags, "tagID")

# Group by tagValue and count the occurrences
grouped_df = joined_df.groupBy("tagValue").agg(count("*").alias("tagOccurrences"))

# Sort the data in descending order by tagOccurrences
sorted_df = grouped_df.orderBy("tagOccurrences", ascending=False)

# Take the top three rows
top_three_df = sorted_df.limit(3)

# Show the results
top_three_df.show()

+-----------+--------------+
|   tagValue|tagOccurrences|
+-----------+--------------+
|       rock|          7503|
|        pop|          5418|
|alternative|          5251|
+-----------+--------------+

