In [None]:
# DO NOT RUN THIS CELL
#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PySparkShell').getOrCreate()
sc = spark.sparkContext
#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!#!

In [1]:
spark

## Setup

In [9]:
import datetime
import operator

import pyspark.sql.functions as F

In [11]:
def parse_date(s):
    dt = datetime.datetime.strptime(s[:10], '%Y-%m-%d')
    return dt.date()

In [3]:
charts = spark.read.parquet('files/charts.parquet')
# charts = charts.toDF('id', 'title', 'position', 'date', 'countryId', 'chartName', 'movement', 'streams')
charts.createOrReplaceTempView('charts')
chartsRDD = sc.textFile('files/charts.csv').map(lambda line: line.split(','))

list(enumerate(charts.columns))

[(0, 'songId'),
 (1, 'title'),
 (2, 'position'),
 (3, 'date'),
 (4, 'countryId'),
 (5, 'chartName'),
 (6, 'movement'),
 (7, 'streams')]

In [4]:
songId = 0
title = 1
position = 2
date = 3
countryId = 4
chartName = 5
movement = 6
streams = 7

In [5]:
regions = spark.read.parquet('files/regions.parquet')
regions.createOrReplaceTempView('regions')
regionsRDD = sc.textFile('files/regions.csv').map(lambda line: line.split(','))

list(enumerate(regions.columns))

[(0, 'countryId'), (1, 'countryName')]

In [6]:
chart_artist_mapping = spark.read.parquet('files/chart_artist_mapping.parquet')
chart_artist_mapping.createOrReplaceTempView('chart_artist_mapping')
chart_artist_mappingRDD = sc.textFile('files/chart_artist_mapping.csv').map(lambda line: line.split(','))

list(enumerate(chart_artist_mapping.columns))

[(0, 'songId'), (1, 'artistId')]

In [7]:
artists = spark.read.parquet('files/artists.parquet')
artists.createOrReplaceTempView('artists')
artistsRDD = sc.textFile('files/artists.csv').map(lambda line: line.split(','))

list(enumerate(artists.columns))

[(0, 'artistId'), (1, 'artistName')]

## Query 1

In [7]:
spark.sql('''
    SELECT sum(streams) FROM charts
    WHERE chartName="top200" AND title="Shape of You"
''').show()

+------------+
|sum(streams)|
+------------+
|  2324245979|
+------------+



In [8]:
(
    chartsRDD
     .filter(lambda x: x[chartName] == 'top200' and x[title] == 'Shape of You')
     .map(lambda x: int(x[streams]))
     .reduce(operator.add)
)

2324245979

## Query 2

In [9]:
(
    charts
     .filter(F.col('position') == 1)
     .groupBy(['chartName', 'songId'])
     .count()
     .withColumn('avgCount', F.col('count') / 69)
     .groupBy('chartName')
     .agg(F.max('avgCount'))
     .show()
)

+---------+------------------+
|chartName|     max(avgCount)|
+---------+------------------+
|   top200|  54.2463768115942|
|  viral50|24.985507246376812|
+---------+------------------+



In [10]:
spark.sql('''
    SELECT chartName, first(title) title, max(avgCount) maxAvgTime
    FROM (
        SELECT chartName, first(title) title, count(*)/69 avgCount
        FROM charts
        WHERE position=1
        GROUP BY chartName, songId
    )
    GROUP BY chartName
''').show()

+---------+--------------------+------------------+
|chartName|               title|        maxAvgTime|
+---------+--------------------+------------------+
|   top200|         100 zhivota|  54.2463768115942|
|  viral50|"""Was He Slow?""...|24.985507246376812|
+---------+--------------------+------------------+



In [11]:
(
    chartsRDD
     .filter(lambda x: x[position] == '1')
     .map(lambda x: ((x[chartName], x[title]), 1))
     .reduceByKey(operator.add)
     .map(lambda x: (x[0][0], (x[0][1], x[1]/69)))
     .reduceByKey(lambda x, y: x if x[1] > y[1] else y)
     .map(lambda x: (x[0], *x[1]))
     .collect()
)

[('viral50', 'Calma - Remix', 24.985507246376812),
 ('top200', 'Shape of You', 54.2463768115942)]

## Query 3

In [12]:
(
    charts
     .filter((F.col('position') == 1) & (F.col('chartName') == 'top200'))
     .groupBy('date')
     .agg(F.sum('streams'))
     .groupBy([F.year('date'), F.month('date')])
     .agg(F.mean('sum(streams)'))
     .orderBy([F.year('date'), F.month('date')])
     .show(3)
)

+----------+-----------+-----------------+
|year(date)|month(date)|avg(sum(streams))|
+----------+-----------+-----------------+
|      2017|          1|7618611.064516129|
|      2017|          2|8876450.785714285|
|      2017|          3| 8955476.41935484|
+----------+-----------+-----------------+
only showing top 3 rows



In [17]:
(
    chartsRDD
     .filter(lambda x: x[position] == '1' and x[chartName] == 'top200')
     .map(lambda x: (parse_date(x[date]), int(x[streams])))
     .reduceByKey(operator.add)
     .map(lambda x: ((x[0].year, x[0].month), (x[1], 1)))
     .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
     .sortByKey()
     .map(lambda x:(x[0][0], x[0][1], x[1][0] / x[1][1]))
     .take(3)
)

[(2017, 1, 7618611.064516129),
 (2017, 2, 8876450.785714285),
 (2017, 3, 8955476.41935484)]

In [16]:
spark.sql('''
    SELECT year(date), month(date), sum(streamsDay) streams1, count(*)
    FROM (
        SELECT date, sum(streams) streamsDay
        FROM charts
        WHERE position = 1 AND chartName == "top200"
        GROUP BY date
    )
    GROUP BY year(date), month(date)
    ORDER BY year(date), month(date)
''').show(3)

+----------+-----------+---------+--------+
|year(date)|month(date)| streams1|count(1)|
+----------+-----------+---------+--------+
|      2017|          1|236176943|      31|
|      2017|          2|248540622|      28|
|      2017|          3|277619769|      31|
+----------+-----------+---------+--------+
only showing top 3 rows



## Query 4

In [16]:
spark.sql('''
    SELECT countryName, songId songId, title, maxCount
    FROM (
        SELECT countryId, songId, title, cnt, 
            max(cnt) OVER (PARTITION BY countryId) AS maxCount 
        FROM (
            SELECT countryId, songId, first(title) title, count(*) cnt
            FROM charts
            WHERE chartName = "viral50"
            GROUP BY countryId, songId
        )
    ) a
    LEFT JOIN regions
    ON a.countryId = regions.countryId
    WHERE cnt = maxCount
    ORDER BY countryName, title
''').show(3)

+-----------+------+--------------------+--------+
|countryName|songId|               title|maxCount|
+-----------+------+--------------------+--------+
|    Andorra| 55526|Friday (feat. Muf...|     251|
|  Argentina| 35851|        Dance Monkey|     253|
|  Australia| 35851|        Dance Monkey|     217|
+-----------+------+--------------------+--------+
only showing top 3 rows



In [77]:
# (countryId, songId), (title, count)
counts = (
    chartsRDD
     .filter(lambda x: x[chartName] == 'viral50')
     .map(lambda x: ((x[countryId], x[songId]), (x[title], 1)))
     .reduceByKey(lambda x, y: (x[0], x[1] + y[1]))
     .map(lambda x: (x[0][0], (x[0][1], *x[1])))
)

# countryId, maxCount
maxCounts = (
    counts
     .map(lambda x: (x[0], x[1][2]))
     .reduceByKey(max)
)

# Joining now because both tables are tiny and of equal index

# countryId, (countryName, countryId)
maxCountsNamed = regionsRDD.join(maxCounts)

q4 = (
    counts
     .join(maxCountsNamed)
     .filter(lambda x: x[1][0][2] == x[1][1][1])
     .map(lambda x: (x[1][1][0], x[1][0][0], x[1][0][1], x[1][1][1]))
     .sortBy(lambda x: (x[0], x[1]))
)

q4.take(3)

[('Andorra', '55526', 'Friday (feat. Mufasa;Hypeman) - Dopamine Re-Edit', 251),
 ('Argentina', '35851', 'Dance Monkey', 253),
 ('Australia', '35851', 'Dance Monkey', 217)]

## Query 5

In [13]:
spark.sql('''
    SELECT yr, artistName, maxAvgStreams
    FROM (
        SELECT yr, artistId, avgStreams,
            max(avgStreams) OVER (PARTITION BY yr) AS maxAvgStreams 
        FROM (
            SELECT year(date) yr, artistId, sum(streams)/69 avgStreams
            FROM charts
            JOIN chart_artist_mapping
            ON charts.songId = chart_artist_mapping.songId
            WHERE chartName = "top200"
            GROUP BY year(date), artistId
        )
    ) a
    JOIN artists
    ON a.artistId = artists.artistId
    WHERE avgStreams = maxAvgStreams
    ORDER BY yr
''').show()

+----+--------------+--------------------+
|  yr|    artistName|       maxAvgStreams|
+----+--------------+--------------------+
|2017|    Ed Sheeran|6.2263262666666664E7|
|2018|   Post Malone| 6.812695868115942E7|
|2019|   Post Malone|6.6283253927536234E7|
|2020|     Bad Bunny| 7.794363488405797E7|
|2021|Olivia Rodrigo| 6.446307111594203E7|
+----+--------------+--------------------+



In [18]:
# (year, artistId), avgStreams

avgStreams = (
    chartsRDD
     .filter(lambda x: x[chartName] == 'top200')
     .map(lambda x: (x[songId], (parse_date(x[date]).year, int(x[streams]))))
     .join(chart_artist_mappingRDD)  # songId, ((year, streams), artistId)
     .map(lambda x: ((x[1][0][0], x[1][1]), x[1][0][1]/69))  # (year, artistId), streams
     .reduceByKey(operator.add)
)

# year, maxAvgStreams
maxAvgStreams = (
    avgStreams
     .map(lambda x: (x[0][0], x[1]))
     .reduceByKey(max)
)

# (year, artistName, maxAvgStreams)
q5 = (
    avgStreams
     .map(lambda x: (x[1], x[0]))  # avgStreams, (year, artistId)
     .join(maxAvgStreams.map(lambda x: (x[1], None))) # avgStreams, ((year, artistId), None)
     .map(lambda x: (x[1][0][1], (x[1][0][0], x[0])))  # artistId, (year, maxAvgStreams)
     .join(artistsRDD)  # artistId, ((year, maxAvgStreams), artistName)
     .map(lambda x: (x[1][0][0], x[1][1], x[1][0][1]))  # (year, artistName, maxAvgStreams)
     .sortBy(lambda x: (x[0], x[1]))
)

q5.collect()

[(2017, 'Ed Sheeran', 62263262.66666655),
 (2018, 'Post Malone', 68126958.68115947),
 (2020, 'Bad Bunny', 77943634.88405786),
 (2021, 'Olivia Rodrigo', 64463071.11594206)]

## Query 6