In [1]:
import pyspark as ps
import pyspark.sql.functions as f
from pyspark import SQLContext
from pyspark.sql.types import IntegerType, DateType, TimestampType
from datetime import datetime

In [2]:
spark = (ps.sql.SparkSession.builder 
        .master("local[4]") 
        .appName("sparkSQL exercise") 
        .getOrCreate()
        )
sc = spark.sparkContext

In [3]:
sc

In [4]:
sqlContext = SQLContext(sc)

In [5]:
df = sqlContext.read.csv("uk100.csv", header=True)

In [6]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- last_week_rank: string (nullable = true)
 |-- hmm: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- label: string (nullable = true)
 |-- peak_rank: string (nullable = true)
 |-- weeks_on_chart: string (nullable = true)
 |-- week_of: string (nullable = true)



In [7]:
df.createOrReplaceTempView('test')

In [8]:
spark.sql('''SELECT *
            FROM test
            LIMIT 30
            ''').show()

+---+----+--------------+--------+--------------------+--------------------+--------------------+---------+--------------+----------------+
|_c0|rank|last_week_rank|     hmm|               title|              artist|               label|peak_rank|weeks_on_chart|         week_of|
+---+----+--------------+--------+--------------------+--------------------+--------------------+---------+--------------+----------------+
|  0|   1|            11|        |               RIVER|      ELLIE GOULDING|             POLYDOR|        1|             5|December 27 2019|
|  1|   2|             8|        |ALL I WANT FOR CH...|        MARIAH CAREY|            COLUMBIA|        2|            99|December 27 2019|
|  2|   3|             5|        |      LAST CHRISTMAS|                WHAM|                 RCA|        2|            64|December 27 2019|
|  3|   4|            14|        |FAIRYTALE OF NEW ...|POGUES FT KIRSTY ...|         WARNER BROS|        2|            99|December 27 2019|
|  4|   5|          

## Drop unneeded columns and cast the rest to the correct types.

In [10]:
df = df.drop('_c0')
df = df.drop('hmm')

for col in df.columns:
    df = df.withColumn(col, f.lower(f.col(col)))

df = df.withColumn("rank", df["rank"].cast(IntegerType()))
df = df.withColumn("peak_rank", df["peak_rank"].cast(IntegerType()))
df = df.withColumn("weeks_on_chart", df["weeks_on_chart"].cast(IntegerType()))

def to_date(x):
    return datetime.strptime(x, '%B %d %Y')
change_to_datetype = f.udf(lambda y: to_date(y), DateType())

df = df.withColumn("week_of", change_to_datetype('week_of'))

In [11]:
df.printSchema()

root
 |-- rank: integer (nullable = true)
 |-- last_week_rank: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- label: string (nullable = true)
 |-- peak_rank: integer (nullable = true)
 |-- weeks_on_chart: integer (nullable = true)
 |-- week_of: date (nullable = true)



In [12]:
df.createOrReplaceTempView('test')

## Total Count

In [13]:
spark.sql('''
        SELECT COUNT(*)
        FROM test
        ''').show()

+--------+
|count(1)|
+--------+
|   26000|
+--------+



In [38]:
spark.sql('''
        SELECT *
        FROM test
        ''').show()

+----+--------------+--------------------+--------------------+--------------------+---------+--------------+----------+
|rank|last_week_rank|               title|              artist|               label|peak_rank|weeks_on_chart|   week_of|
+----+--------------+--------------------+--------------------+--------------------+---------+--------------+----------+
|   1|            11|               river|      ellie goulding|             polydor|        1|             5|2019-12-27|
|   2|             8|all i want for ch...|        mariah carey|            columbia|        2|            99|2019-12-27|
|   3|             5|      last christmas|                wham|                 rca|        2|            64|2019-12-27|
|   4|            14|fairytale of new ...|pogues ft kirsty ...|         warner bros|        2|            99|2019-12-27|
|   5|             2|              own it|stormzy/ed sheera...|      atlantic/merky|        2|             5|2019-12-27|
|   6|            16|merry chris

## Count of distinct titles (songs with same names included)

In [15]:
spark.sql('''
        SELECT COUNT(DISTINCT title)
        FROM test
        ''').show()

+---------------------+
|count(DISTINCT title)|
+---------------------+
|                 2416|
+---------------------+



## Number of distinct titles that have reached rank 1

In [16]:
spark.sql('''
        SELECT COUNT(DISTINCT title)
        FROM test
        WHERE rank == 1''').show()

+---------------------+
|count(DISTINCT title)|
+---------------------+
|                   77|
+---------------------+



## Number of unique songs

In [26]:
num_unique_songs = spark.sql('''
        SELECT artist, title
        FROM test
        GROUP BY artist, title''')

In [27]:
num_unique_songs.count()

2605

In [18]:
spark.sql('''
        SELECT COUNT(*)
        FROM(
            SELECT artist, title
            FROM test
            GROUP BY artist, title)
        ''').show()

+--------+
|count(1)|
+--------+
|    2605|
+--------+



## How many weeks did these songs stay at rank 1

In [28]:
spark.sql('''
            SELECT DISTINCT artist, title, COUNT(*) as num_weeks, MIN(week_of)
            FROM test
            WHERE rank == 1
            GROUP BY 1, 2
            ORDER BY 3 DESC
            ''').show()

+--------------------+--------------------+---------+------------+
|              artist|               title|num_weeks|min(week_of)|
+--------------------+--------------------+---------+------------+
|drake ft wizkid &...|           one dance|       15|  2016-04-15|
|          ed sheeran|        shape of you|       14|  2017-01-13|
|           tones & i|        dance monkey|       11|  2019-10-04|
|luis fonsi/daddy ...|   despacito (remix)|       11|  2017-05-12|
|        clean bandit|            rockabye|        9|  2016-11-11|
|               drake|          god's plan|        9|  2018-01-26|
|ed sheeran & just...|        i don't care|        8|  2019-05-17|
|calvin harris & d...|            one kiss|        8|  2018-04-20|
|       lewis capaldi|   someone you loved|        7|  2019-03-01|
|       justin bieber|       love yourself|        6|  2015-12-04|
|shawn mendes/cami...|            senorita|        6|  2019-07-12|
|       ariana grande|        thank u next|        6|  2018-11

## Total number of appearances on the top 100, in order

In [31]:
spark.sql('''
            SELECT DISTINCT artist, title, COUNT(*) as num_weeks, MIN(week_of)
            FROM test
            WHERE peak_rank == 1
            GROUP BY 1,2
            ORDER BY 3 DESC
            ''').show()

+--------------------+-------------------+---------+------------+
|              artist|              title|num_weeks|min(week_of)|
+--------------------+-------------------+---------+------------+
|          ed sheeran|       shape of you|       97|  2017-01-13|
|          ed sheeran|  thinking out loud|       90|  2015-01-11|
|          ed sheeran|            perfect|       81|  2017-12-08|
|         george ezra|            shotgun|       74|  2018-06-29|
|mark ronson ft br...|        uptown funk|       72|  2015-01-11|
|luis fonsi/daddy ...|  despacito (remix)|       71|  2017-05-12|
|       justin bieber|              sorry|       65|  2015-11-20|
|drake ft wizkid &...|          one dance|       64|  2016-04-15|
|       justin bieber|      love yourself|       62|  2015-12-04|
|       justin bieber|   what do you mean|       62|  2015-09-04|
|            dua lipa|          new rules|       61|  2017-08-18|
|       years & years|               king|       59|  2015-03-08|
|lady gaga

## Average number of weeks in the top 100 (songs that hit rank 1)

In [33]:
spark.sql('''
            SELECT AVG(num_weeks)
            FROM (SELECT DISTINCT title, COUNT(*) as num_weeks, MIN(week_of)
                    FROM test
                    WHERE peak_rank == 1
                    GROUP BY 1
                    ORDER BY 2 DESC)
            ''').show()

+------------------+
|    avg(num_weeks)|
+------------------+
|26.708661417322833|
+------------------+



## Average rank for the song "Shape of You" by Ed Sheeran

In [43]:
spark.sql('''
            SELECT ROUND(AVG(rank),2), MAX(weeks_on_chart)
            FROM test
            WHERE title = "shape of you"
            ''').show()

+-----------------------------------+-------------------+
|round(avg(CAST(rank AS BIGINT)), 2)|max(weeks_on_chart)|
+-----------------------------------+-------------------+
|                              42.52|                 97|
+-----------------------------------+-------------------+



## Average rank and number of weeks on chart

In [None]:
spark.sql('''
            SELECT AVG()''')

In [69]:
spark.sql('''
            SELECT title, artist, COUNT(*), ROUND(AVG(rank),2), MIN(week_of) as week_entered
            FROM test
            GROUP BY 1,2
            ORDER BY 3 DESC
            ''').show()

+--------------------+--------------------+--------+-----------------------------------+------------+
|               title|              artist|count(1)|round(avg(CAST(rank AS BIGINT)), 2)|week_entered|
+--------------------+--------------------+--------+-----------------------------------+------------+
|             perfect|          ed sheeran|     110|                              48.09|  2017-03-10|
|       mr brightside|             killers|     105|                              84.86|  2015-04-19|
|        shape of you|          ed sheeran|      97|                              42.52|  2017-01-13|
|   thinking out loud|          ed sheeran|      90|                              56.27|  2015-01-11|
|             shotgun|         george ezra|      87|                              35.99|  2018-03-30|
|          chandelier|                 sia|      87|                              64.46|  2015-01-11|
|             lean on|major lazer ft mo...|      79|                              

In [73]:
new_songs_2015_2019 = spark.sql('''
            SELECT *
            FROM (SELECT title, artist, COUNT(*), ROUND(AVG(rank),2), MIN(week_of) as week_entered
                FROM test
                GROUP BY 1,2
                ORDER BY 3 DESC)
            WHERE week_entered > "2015-01-11"''')

In [74]:
new_songs_2015_2019.createOrReplaceTempView('test2')

In [53]:
spark.sql('''
            SELECT AVG(count)
            FROM (SELECT title, artist, COUNT(*) as count
                FROM test
                GROUP BY 1,2
                HAVING count > 1
                ORDER BY 3)
            ''').show()

+------------------+
|        avg(count)|
+------------------+
|13.223092998955067|
+------------------+



In [None]:
spark.sql()