In [None]:
from pyspark.sql import SparkSession
import os
import configparser
import pyspark.sql.functions as F
import pyspark.sql.types as Ptype

In [None]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [None]:
#!unzip data/song-data.zip

# Song data

In [None]:
df5 = spark.read.json("song_data/{A,B,C}/{A,B,C}/{A,B,C}")

In [None]:
df5.count()

In [None]:
df5.printSchema()

## artist table

In [None]:
df_artists = df5.select("artist_id", 
                        "artist_name",
                        "artist_location",
                        "artist_latitude",
                        "artist_longitude").\
        groupBy("artist_id", "artist_name", "artist_location", 
                "artist_latitude", "artist_longitude")


In [None]:
df_artists.count().show()

In [None]:
df_artists = df5.select("artist_id", 
                        F.col("artist_name").alias("name"),
                        F.col("artist_location").alias("location"),
                        F.col("artist_latitude").alias("latitude"),
                        F.col("artist_longitude").alias("longitude") ).\
        distinct()

In [None]:
df_artists.printSchema()

In [None]:
df_artists.limit(10).toPandas()

In [None]:
df_artists.count()

In [None]:
df5.count()

## Song table

In [None]:
#song_id, title, artist_id, year, duration
df_songs = df5.select("song_id", 
                      "title", 
                      "artist_id", 
                      "year", 
                      "duration").\
    distinct()

In [None]:
df_songs.limit(10).toPandas()

In [None]:
df_songs.printSchema()

In [None]:
df_songs.count()

## Write 

In [None]:
!mkdir OUT

In [None]:
df_songs.write.partitionBy("year", "artist_id").mode("overwrite").parquet("./OUT/songs.parquet")

In [None]:
!ls OUT/songs.parquet/year\=1987/artist_id\=ARD842G1187B997376

# log data

In [None]:
#!mkdir log_data
#!cd ./log_data; unzip ../data/log-data.zip
#####!mv 2018-11*json log_data

In [None]:
df_log = spark.read.json("log_data/")

In [None]:
df_log.printSchema()

In [None]:
df_log.count()

## Users table

In [None]:
# user_id, first_name, last_name, gender, level
df_users = df_log.select("userId", 
                         "firstName", 
                         "lastName", 
                         "gender", 
                         "level")\
                    .distinct()

In [None]:
df_users = df_users.select( F.col("userId").cast("int").alias("user_id"),
                          F.col("firstName").alias("first_name"),
                          F.col("lastName").alias("last_name"),
                          "gender",
                          "level"
                          )\
                .orderBy("user_id")

In [None]:
df_users.limit(20).toPandas()

In [None]:
df_users.printSchema()

## Timestamp table

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as Ptype


In [None]:
t1 = df_log.withColumn("datetime", F.from_unixtime(df_log.ts/1000))
t1

In [None]:
t1.select("datetime", "ts").limit(10).toPandas()

In [None]:
t1 = t1.withColumn("hour", F.hour("datetime"))\
            .withColumn("day", F.dayofmonth("datetime"))\
            .withColumn("week", F.weekofyear("datetime"))\
            .withColumn("month", F.month("datetime"))\
            .withColumn("year", F.year("datetime"))\
            .withColumn("weekday", F.dayofweek("datetime"))\

In [None]:
df_timestamp = t1.select("ts", "hour", "day", "week", "month", "year", "weekday").distinct()

In [None]:
df_timestamp.count()

In [None]:
df_log.count()

In [None]:
df_timestamp.limit(10).toPandas()

In [None]:
df_timestamp.printSchema()

### Fact table : songplays

In [None]:
df_log.createOrReplaceTempView("table_log")
df_artists.createOrReplaceTempView("table_artists")
df_songs.createOrReplaceTempView("table_songs")

In [None]:
#songplay_id, start_time, user_id, level, song_id, 
#artist_id, session_id, location, user_agent
df_songplays = spark.sql("""
    SELECT lg. ts AS start_time,
        lg.userId AS user_id,
        lg.level AS level,
        sg.song_id,
        art.artist_id,
        lg.sessionId AS session_id,
        lg.location,
        lg.userAgent AS user_agent    
    FROM table_log AS lg
    JOIN table_artists as art ON art.artist_name = lg.artist
    JOIN table_songs AS sg ON sg.title = lg.song AND art.artist_name = lg.artist
    
""")

In [None]:
df_songplays = df_songplays.withColumn("songplay_id", F.monotonically_increasing_id())

In [None]:
rearrange_col = df_songplays.schema.names[:]
rearrange_col.insert( 0, "songplay_id")
rearrange_col.pop()
rearrange_col

In [None]:
df_songplays = df_songplays.select(*rearrange_col)

In [None]:
df_songplays.limit(5).toPandas()

In [None]:
df_songplays.printSchema()