In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, unix_timestamp, dayofweek
import pyspark.sql.functions as F
import zipfile

In [2]:
#DELETE DIR
#import shutil
#shutil.rmtree('data/new_folder')

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [4]:
with zipfile.ZipFile("data/log-data.zip","r") as zip_ref:
    zip_ref.extractall("data/log-data")

In [5]:
with zipfile.ZipFile("data/song-data.zip","r") as zip_ref:
    zip_ref.extractall("data/song-data")

In [6]:
df = spark.read.json("data/song-data/*/*/*/*/*.json")

In [7]:
df.head()

Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0)

In [8]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [9]:
songs_table = df.select("song_id","title","artist_id","year","duration")

In [10]:
st = songs_table.alias('st')  

In [11]:
songs_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
3,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281
4,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,ARNF6401187FB57032,1994,305.162


In [12]:
artists_table = df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")

In [13]:
at = artists_table.alias('at')  

In [14]:
at.limit(5).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARDR4AC1187FB371A1,Montserrat Caballé;Placido Domingo;Vicente Sar...,,,
1,AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
2,ARMAC4T1187FB3FA4C,The Dillinger Escape Plan,"Morris Plains, NJ",40.82624,-74.47995
3,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
4,ARNF6401187FB57032,Sophie B. Hawkins,"New York, NY [Manhattan]",40.79086,-73.96644


In [15]:
#st.write.partitionBy("year","artist_id").parquet("data/new_folder")

In [16]:
df2 = spark.read.json("data/log-data")

In [17]:
df2 = df2.filter("page=='NextSong'")

In [18]:
df2.toPandas().head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [19]:
df2.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [20]:
users_table = df2.select("userId", "firstName", "lastName", "gender", "level")

In [21]:
users_table.toPandas().head()

Unnamed: 0,userId,firstName,lastName,gender,level
0,26,Ryan,Smith,M,free
1,26,Ryan,Smith,M,free
2,26,Ryan,Smith,M,free
3,61,Samuel,Gonzalez,M,free
4,80,Tegan,Levine,F,paid


In [22]:
#get_time = udf(lambda x: unix_timestamp(x, 'mm/dd/yyyy HH:mm:ss'))

In [23]:
df2 = df2.withColumn("timestamp",  F.to_timestamp(df2.ts/1000))#get_time("ts"))

In [24]:
df2.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [25]:
#get_date = udf(lambda x: date_format(x, 'mm/dd/yyyy HH:mm:ss'))

In [26]:
df2 = df2.withColumn("datetime",  F.to_date(df2.timestamp))#get_time("ts"))

In [27]:
print(df2.select("timestamp").show())

+--------------------+
|           timestamp|
+--------------------+
|2018-11-15 00:30:...|
|2018-11-15 00:41:...|
|2018-11-15 00:45:...|
|2018-11-15 03:44:...|
|2018-11-15 05:48:...|
|2018-11-15 05:53:...|
|2018-11-15 05:55:...|
|2018-11-15 06:01:...|
|2018-11-15 06:07:...|
|2018-11-15 06:10:...|
|2018-11-15 06:13:...|
|2018-11-15 06:14:...|
|2018-11-15 06:17:...|
|2018-11-15 06:18:...|
|2018-11-15 06:21:...|
|2018-11-15 06:25:...|
|2018-11-15 06:29:...|
|2018-11-15 07:08:...|
|2018-11-15 07:12:...|
|2018-11-15 07:28:...|
+--------------------+
only showing top 20 rows

None


In [28]:
time_table = df2.selectExpr(["timestamp as start_time","hour(datetime)", "dayofmonth(datetime)", "weekofyear(datetime)", "month(datetime)", "year(datetime)", "dayofweek(datetime)"])

In [29]:
songplay = song_df.join(artist_df, )

AnalysisException: "cannot resolve '`song_id`' given input columns: [itemInSession, lastName, auth, registration, timestamp, ts, artist, datetime, level, status, userId, page, song, firstName, userAgent, gender, location, method, length, sessionId];;\n'Project ['song_id, 'title, 'artist_id, 'year, 'duration]\n+- Project [artist#62, auth#63, firstName#64, gender#65, itemInSession#66L, lastName#67, length#68, level#69, location#70, method#71, page#72, registration#73, sessionId#74L, song#75, status#76L, ts#77L, userAgent#78, userId#79, timestamp#127, to_date(timestamp#127, None) AS datetime#147]\n   +- Project [artist#62, auth#63, firstName#64, gender#65, itemInSession#66L, lastName#67, length#68, level#69, location#70, method#71, page#72, registration#73, sessionId#74L, song#75, status#76L, ts#77L, userAgent#78, userId#79, to_timestamp((ts#77L / 1000), None) AS timestamp#127]\n      +- Filter (page#72 = NextSong)\n         +- Relation[artist#62,auth#63,firstName#64,gender#65,itemInSession#66L,lastName#67,length#68,level#69,location#70,method#71,page#72,registration#73,sessionId#74L,song#75,status#76L,ts#77L,userAgent#78,userId#79] json\n"