In [1]:
import findspark
import os
import glob
findspark.init()

In [2]:
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType, DataType, IntegerType
from pyspark.sql.window import Window

In [3]:
spark=SparkSession \
            .builder \
            .appName('test_spark') \
            .getOrCreate()

In [4]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, '*json'))
        for f in files:
            all_files.append(os.path.abspath(f))
    return all_files

In [5]:
song_files = get_files('data/song_data')

In [6]:
df  = spark.read.option("inferSchema" , "True").json(song_files)

In [7]:
df.show()

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [8]:
songs_table = df.select(['song_id', 'title', 'artist_id', 'year', 'duration'])
songs_table.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|   0|326.00771|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|ARPFHN61187FB575F6|   0|279.97995|
|SODUJBS12A8C132150|Wessex Loses a Bride|ARI2JSK1187FB496EF|   0|111.62077|
|SOBZBAZ12A6D4F8742|      Spanish Grease|AROUOZZ1187B9ABE51|1997|168.25424|
|SOGXHEG12AB

In [None]:
songs_table.write.parquet('data/songs', partitionBy = ['year', 'artist_id'])

In [10]:
#extract columns to create artists table
columns = ['artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
columns = [col + ' as ' + col.replace('artist_', '') for col in columns]
artists_table = df.selectExpr('artist_id', *columns)

In [11]:
artists_table.show()

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |    null|      null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|    null|      null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624| -74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|40.71455| -74.00712|
|ARDNS031187B9924F0|          Tim Wilson|             Georgia|32.67828| -83.22295|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086| -73.96644|
|ARLTWXK1187FB5A3F8|         King Curtis|      Fort Worth, TX|32.74863| -97.32925|
|ARPFHN61187FB575F6|         Lupe Fiasco|         Chicago, IL|41.88415| -87.63241|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|51.50632|  -0.12714|
|ARO

In [None]:
artists_table.write.parquet('data/artists')

In [12]:
song_data = get_files('data\log_data')

In [13]:
df = spark.read.json(song_data)

In [15]:
df = df.withColumn('user_id', df.userId.cast(IntegerType()))

In [16]:
df = df.where(df.page == 'NextSong')

In [17]:
users_table = df.selectExpr(['user_id', 'firstName as first_name','lastName as last_name', 'gender', 'level', 'ts' ])
users_window  = Window.partitionBy('user_id').orderBy(F.desc('ts'))

In [21]:
users_table = users_table.withColumn('row_number', F.row_number().over(users_window))
users_table = users_table.where(users_table.row_number == 1).drop('ts', 'row_number')

In [22]:
users_table.write.parquet('data/users')

In [23]:
get_timestamp = F.udf(lambda ts : datetime.fromtimestamp(ts/1000).isoformat())
df = df.withColumn('start_time', get_timestamp('ts').cast(TimestampType()))

In [25]:
time_table = df.select('start_time')
time_table = time_table.withColumn('hour', F.hour('start_time'))
time_table = time_table.withColumn('day', F.dayofmonth('start_time'))
time_table = time_table.withColumn('week', F.weekofyear('start_time'))
time_table = time_table.withColumn('month', F.month('start_time'))
time_table = time_table.withColumn('year', F.year('start_time'))
time_table = time_table.withColumn('weekday', F.dayofweek('start_time'))

In [26]:
time_table.write.parquet('data/time', partitionBy = ['year', 'month'])

In [27]:
song_df = spark.read.json(song_files)

In [28]:
df = df.orderBy('ts')
df = df.withColumn('songplay_id', F.monotonically_increasing_id())
song_df.createOrReplaceTempView('songs')
df.createOrReplaceTempView('events')

In [32]:
songplays_table = spark.sql("""
        SELECT
            e.songplay_id,
            e.start_time,
            e.user_id,
            e.level,
            s.song_id,
            s.artist_id,
            e.sessionId as session_id,
            e.location,
            e.userAgent as user_agent,
            year(e.start_time) as year,
            month(e.start_time) as month
        FROM events e
        LEFT JOIN songs s ON
            e.song = s.title AND
            e.artist = s.artist_name AND
            ABS(e.length - s.duration) < 2
    """)

In [33]:
songplays_table.write.parquet('data/songplays', partitionBy = ['year', 'month'])