In [None]:
from pyspark.sql.types import LongType

In [None]:
# Input data and output folders - SPARK SQL
inUsers = "Users.txt"
inSongs = "Songs.txt"
inListenToSongs = "ListenToSongs.txt"

outputPathPart1 = "outPart1SQL/"
outputPathPart2 = "outPart2SQL/"

In [None]:
# Part 1

In [None]:
# Read the input data
usersDF = spark.read.load(inUsers,\
                            format="csv",\
                            header=False,\
                            inferSchema=True)\
.withColumnRenamed("_c0", "UID")\
.withColumnRenamed("_c1", "Name")\
.withColumnRenamed("_c2", "Surname")\
.withColumnRenamed("_c3", "Gender")\
.withColumnRenamed("_c4", "YearOfBirth")

In [None]:
songsDF = spark.read.load(inSongs,\
                            format="csv",\
                            header=False,\
                            inferSchema=True)\
.withColumnRenamed("_c0", "SID")\
.withColumnRenamed("_c1", "Title")\
.withColumnRenamed("_c2", "MusicGenre")

In [None]:
listenDF = spark.read.load(inListenToSongs,\
                            format="csv",\
                            header=False,\
                            inferSchema=True)\
.withColumnRenamed("_c0", "SID")\
.withColumnRenamed("_c1", "UID")\
.withColumnRenamed("_c2", "StartTimestamp")\
.withColumnRenamed("_c3", "EndTimestamp")\
.cache()

In [None]:
# Associate the input dataframes to temporary tables
usersDF.createOrReplaceTempView("users")
songsDF.createOrReplaceTempView("songs")
listenDF.createOrReplaceTempView("listen")

In [None]:
# Select the songs never listened to by young users in the last two years
midsIncomesDF = spark.sql("""SELECT SID
FROM songs 
     LEFT ANTI JOIN
     (SELECT SID
      FROM listen, users 
      WHERE listen.UID=users.UID
      AND StartTimestamp>='2018/09/14' 
      AND StartTimestamp<='2020/09/13'
      AND YearOfBirth>1999) listenByYoung
     ON songs.SID=listenByYoung.SID""")

In [None]:
#midsIncomesDF.show()

In [None]:
# Store the result
midsIncomesDF.write.csv(outputPathPart1,header=False)

In [None]:
# Part 2

In [None]:
# Define a UDF that extract the year part from Date
def yearFunc(timestamp):
    return int(timestamp.split("/")[0])
    
spark.udf.register("yearFunc", yearFunc, LongType())

In [None]:
# Compute for each song its popularity in each year, 
# i.e., the number of distinct users who listened to it in that year

In [None]:
# Remove duplicates. We are interested in the number of distinct users for each (song,year)
sidYearDistinctUserDF = spark.sql("""
SELECT DISTINCT SID,
yearFunc(StartTimestamp) as Year,
UID 
FROM listen""")

In [None]:
# Associate sidYearDistinctUserDF to a temporary table
sidYearDistinctUserDF.createOrReplaceTempView("listenNoDuplicates")

In [None]:
# Count the number of distinct user for each pair (sid,year)
sidYearPopularityDF = spark.sql("""SELECT SID, Year, COUNT(*) as YearlyPopularity
FROM listenNoDuplicates
GROUP BY SID, Year""")

In [None]:
#sidYearPopularityDF.show()

In [None]:
# Associate sidYearPopularityDF to a temporary table
sidYearPopularityDF.createOrReplaceTempView("SIDYearlyPopularity")

In [None]:
# Compute the highest yearly popularity for each song
sidMaxPopDF = spark.sql("""SELECT SID, MAX(YearlyPopularity) as MaxPop
FROM SIDYearlyPopularity
GROUP BY SID""")

In [None]:
#sidMaxPopDF.show()

In [None]:
# Associate sidMaxPopDF to a temporary table
sidMaxPopDF.createOrReplaceTempView("SIDMaxYearlyPopularity")

In [None]:
# Select for each song the first year 
# associated with its highest popularity
sidFirstYearMaxPopDF = spark.sql("""SELECT SIDYearlyPopularity.SID, MIN(Year) 
FROM SIDYearlyPopularity,SIDMaxYearlyPopularity
WHERE SIDYearlyPopularity.SID=SIDMaxYearlyPopularity.SID
AND SIDYearlyPopularity.YearlyPopularity=SIDMaxYearlyPopularity.MaxPop
GROUP BY SIDYearlyPopularity.SID""")

In [None]:
#sidFirstYearMaxPopDF.show()

In [None]:
# Store the result
sidFirstYearMaxPopDF.write.csv(outputPathPart2,header=False)