In [3]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [4]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [5]:
os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [6]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [7]:
spark

In [66]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
songSchema = R([
    Fld("artist_id",Str()),
    Fld("artist_latitude",Str()),
    Fld("artist_location",Str()),
    Fld("artist_longitude",Str()),
    Fld("artist_name",Str()),
    Fld("duration",Dbl()),
    Fld("num_songs",Int()),
    Fld("song_id",Str()),
    Fld("title",Str()),
    Fld("year",Int())
])

In [8]:
song_data = '/home/workspace/data/song-data/song_data/*/*/*/*.json'
#song_data = 's3a://udacity-dend/song_data/*/*/*/*.json'


#/A/B/C/TRABCEI128F424C983.json

In [9]:
df = spark.read.json(song_data)

In [10]:
df.printSchema()
#df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [11]:
df.count()

71

In [14]:
songs_table = df.select("song_id", "title", "artist_id", "year", "duration").distinct()

In [15]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [16]:
artists_table = df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude").distinct()

In [17]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



In [12]:
log_data ='/home/workspace/data/log-data/*.json'
#log_data ='s3a://udacity-dend/log_data/2018/11/*.json'

In [21]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str,Long as Lng, IntegerType as Int, DateType as Date
logSchema = R([
    Fld("artist",Str()),
    Fld("auth",Str()),
    Fld("firstName",Str()),
    Fld("gender",Str()),
    Fld("itemInSession",Int()),
    Fld("lastName",Str()),
    Fld("length",Dbl()),
    Fld("level",Str()),
    Fld("location",Str()),
    Fld("method",Str()),
    Fld("page",Str()),
    Fld("registration",Dbl()),
    Fld("sessionId",Int()),
    Fld("song",Str()),
    Fld("status",Int()),
    Fld("ts",Lng()),
    Fld("userAgent",Str()),
    Fld("usedId",Str())
])

ImportError: cannot import name 'Long'

In [13]:
df_log = spark.read.json(log_data)

In [14]:
df_log.printSchema()
#df_log.show(2)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
df_log = df_log.filter(df_log.page == 'NextSong')

In [95]:
users_table = df_log.select("userId", "firstName", "lastName", "gender", "level").distinct()

In [96]:
users_table.printSchema()
users_table.show(5)

root
 |-- userId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    57|Katherine|     Gay|     F| free|
|    84|  Shakira|    Hunt|     F| free|
|    22|     Sean|  Wilson|     F| free|
|    52| Theodore|   Smith|     M| free|
|    80|    Tegan|  Levine|     F| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



In [97]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [155]:
df_log = spark.read.json(log_data)
df_log.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

In [17]:
import pyspark.sql.functions as F
from pyspark.sql import types as T#IntegerType,TimestampType, DateType,StringType
from datetime import datetime
get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
get_datetime = F.udf(lambda x: datetime.date(datetime.fromtimestamp( (x/1000.0) )), T.DateType())

In [18]:
#df_log = df_log.withColumn("ts_date",F.from_unixtime(F.col("ts")/1000,'yyyy-MM-dd HH:mm:ss'))
#df_log = df_log.withColumn("ts_date",colsDate("ts")).head(10)
df_log = df_log.withColumn("timestamp",get_timestamp("ts"))
df_log = df_log.withColumn("datetimecolumn",get_datetime("ts"))

In [19]:
df_log.select("ts","timestamp","datetimecolumn").head(2)

[Row(ts=1542241826796, timestamp=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000), datetimecolumn=datetime.date(2018, 11, 15)),
 Row(ts=1542242481796, timestamp=datetime.datetime(2018, 11, 15, 0, 41, 21, 796000), datetimecolumn=datetime.date(2018, 11, 15))]

In [20]:
df_time = df_log.select("ts","timestamp","datetimecolumn").distinct()

In [21]:
df_time.show(5)

+-------------+--------------------+--------------+
|           ts|           timestamp|datetimecolumn|
+-------------+--------------------+--------------+
|1542281719796|2018-11-15 11:35:...|    2018-11-15|
|1542282488796|2018-11-15 11:48:...|    2018-11-15|
|1542302979796|2018-11-15 17:29:...|    2018-11-15|
|1542317145796|2018-11-15 21:25:...|    2018-11-15|
|1542317497796|2018-11-15 21:31:...|    2018-11-15|
+-------------+--------------------+--------------+
only showing top 5 rows



In [22]:
df_time.printSchema()

root
 |-- ts: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetimecolumn: date (nullable = true)



In [23]:
#hour,day,week,month,year,weekday
df_time = df_time.withColumn("hour",F.hour(F.col("timestamp")))\
           .withColumn("day",F.dayofyear(F.col("timestamp")))\
           .withColumn("week",F.weekofyear(F.col("timestamp")))\
           .withColumn("month",F.month(F.col("timestamp")))\
           .withColumn("year",F.year(F.col("timestamp")))\
           .withColumn('weekday', F.date_format(F.col("timestamp"), 'EEEE'))

In [24]:
df_time.head(5)

[Row(ts=1542281719796, timestamp=datetime.datetime(2018, 11, 15, 11, 35, 19, 796000), datetimecolumn=datetime.date(2018, 11, 15), hour=11, day=319, week=46, month=11, year=2018, weekday='Thursday'),
 Row(ts=1542282488796, timestamp=datetime.datetime(2018, 11, 15, 11, 48, 8, 796000), datetimecolumn=datetime.date(2018, 11, 15), hour=11, day=319, week=46, month=11, year=2018, weekday='Thursday'),
 Row(ts=1542302979796, timestamp=datetime.datetime(2018, 11, 15, 17, 29, 39, 796000), datetimecolumn=datetime.date(2018, 11, 15), hour=17, day=319, week=46, month=11, year=2018, weekday='Thursday'),
 Row(ts=1542317145796, timestamp=datetime.datetime(2018, 11, 15, 21, 25, 45, 796000), datetimecolumn=datetime.date(2018, 11, 15), hour=21, day=319, week=46, month=11, year=2018, weekday='Thursday'),
 Row(ts=1542317497796, timestamp=datetime.datetime(2018, 11, 15, 21, 31, 37, 796000), datetimecolumn=datetime.date(2018, 11, 15), hour=21, day=319, week=46, month=11, year=2018, weekday='Thursday')]

In [25]:
time_table = df_time.select("ts","hour","day","week","month","year","weekday")

In [26]:
time_table.printSchema()

root
 |-- ts: long (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [27]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetimecolumn: date (nullable = true)



In [45]:
#song_df = df_log.join(df, (df.artist_name == df_log.artist) & (df_log.song == df.title) & (df_log.length == df.duration), how='inner').select("ts","userId", "level", "song_id", "artist_id", "sessionId", "location", "userAgent")

In [36]:
songplays_df= df_log.join(df, (df_log.artist == df.artist_name)\
                                        & (df_log.song == df.title)\
                                        & (df_log.length == df.duration))\
                              .select(time_table.ts,"userId", "level", "song_id", 
                                      "artist_id", "sessionId", "location", "userAgent")

In [38]:
songplays_table = df_log.join(df, (df_log.artist == df.artist_name)\
                                        & (df_log.song == df.title)\
                                        & (df_log.length == df.duration))\
                              .join(time_table,(df_log.ts == time_table.ts))\
                              .select(time_table.ts,"userId", "level", "song_id", 
                                      "artist_id", "sessionId", "location", "userAgent",time_table.year,"month")


In [39]:
songplays_table.show(5)

+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+
|           ts|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|year|month|
+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+
|1542837407796|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|      818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+



In [179]:
df.printSchema()  # Song - artist_name,title,duration

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [180]:
df_log.printSchema()  # Log - artist,song,length

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetimecolumn: date (nullable = true)



In [40]:
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
songplays_table = songplays_table.withColumn("idx", F.monotonically_increasing_id())
window = W.orderBy(F.col('idx'))
#windowSpec = W.orderBy("idx")
songplays_table = songplays_table.withColumn("idx2", F.row_number().over(window))

In [41]:
songplays_table.show(5)

+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+---+----+
|           ts|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|year|month|idx|idx2|
+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+---+----+
|1542837407796|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|      818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|  0|   1|
+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+---+----+



In [184]:
song_df.printSchema()

root
 |-- ts: long (nullable = true)
 |-- userId: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- idx: long (nullable = false)
 |-- idx2: integer (nullable = true)



In [185]:
output_data = "/home/workspace/output/"

In [186]:
song_df.write.partitionBy("year","month").mode("overwrite")\
                                                     .parquet(output_data+"songplays1.parquet")

In [2]:
spark.stop()

NameError: name 'spark' is not defined