In [1]:
import configparser
from datetime import datetime
import os
import itertools
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_timestamp
from pyspark.sql.types import TimestampType, DateType
from pyspark.sql import functions as F
from pyspark.sql import types as T

from column_names import *
from sql_queries import *

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [3]:
spark = create_spark_session()

In [4]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = input_data + "song_data/*/*/*/*.json"

    # read song data file
    raw_song_df = spark.read.json(song_data)
    print('Printing Raw Song Schema ~')
    raw_song_df.printSchema()
    raw_song_df.show(5, truncate=False)

    # extract columns to create songs table - PySpark DF
    print('Printing Song Schema ~')
    print(songs)
    songs_pyspark_df = raw_song_df.select(
        [col for col in songs.values()]
    )
    print('Songs - PySpark DF')
    songs_pyspark_df.show()
    
    # write songs table to parquet files partitioned by year and artist
    songs_pyspark_df.write.partitionBy('year', 'artist_id').parquet(output_data + 'songs')

    # extract columns to create artists table - PySpark DF
    print('Printing Artist Schema ~')
    print(artists)
    artists_pyspark_df = raw_song_df.select(
        [col for col in artists.values()]
    )
    print('Artists - PySpark DF')
    artists_pyspark_df.show()

    # write artists table to parquet files
    artists_pyspark_df.write.parquet(output_data + 'artists')

In [5]:
input_data = f'{os.getcwd()}/'
output_data = ""
process_song_data(spark, input_data, output_data)

Printing Raw Song Schema ~
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+---------------------------+----------------+---------------------+---------+---------+------------------+-------------------------------------------+----+
|artist_id         |artist_latitude|artist_location            |artist_longitude|artist_name          |duration |num_songs|song_id           |title                                      |year|
+------------------+---------------+---------------------------+----------------+---------------------+---------+---------+------------------+--------------------------

AnalysisException: 'path file:/home/jovyan/songs already exists.;'

In [13]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = input_data + 'log_data'

    # read log data file
    raw_log_df = spark.read.json(log_data)
    print('Logging Raw Log Schema ~')
    raw_log_df.printSchema()
    raw_log_df.show(5, truncate=False)

    # filter by actions for song plays
    songplay_log_pyspark_df = raw_log_df.filter(F.col('page') == 'NextSong')
    songplay_log_pyspark_df.show()

    # extract columns for users table    
    users_pyspark_df = raw_log_df.select(
        [col for col in users.values()]
    )
    print('Logging users table - PySpark')
    users_pyspark_df.show(5, truncate=False)
    
    # write users table to parquet files
    users_pyspark_df.write.parquet(input_data + 'users')

    # create timestamp column from original timestamp column
    tsFormat = "yyyy-MM-dd HH:MM:ss z"
    # converting ts to timestamp format
    time_table = songplay_log_pyspark_df.withColumn(
        'timestamp', to_timestamp(
            F.date_format(
                (songplay_log_pyspark_df.ts/1000).cast(dataType=T.TimestampType()), tsFormat), tsFormat))
    print('Logging Songplay Log Schema ~ After creating timestamp column')
    time_table.printSchema()
    
    # extract columns to create time table
    time_table = time_table.select(
        F.col('timestamp').alias('start_time'),
        F.year(F.col('timestamp')).alias('year'),
        F.month(F.col('timestamp')).alias('month'),
        F.weekofyear(F.col('timestamp')).alias('weekofyear'),
        F.dayofmonth(F.col('timestamp')).alias('dayofmonth'),
        F.hour(F.col('timestamp')).alias('hour')
    )
    print('Logging Songplay Log Schema ~ After creating time table')
    time_table.printSchema()
    time_table.show(5)
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy("year","month").parquet(input_data + 'time')

    # get filepath to song data file
    song_data = input_data + "song_data/*/*/*/*.json"
    song_pyspark_df = spark.read.json(song_data)
    print('Logging Song Log Schema ~')
    song_pyspark_df.printSchema()
    
    # read in song data to use for songplays table
    conditions = [
        song_pyspark_df['artist_name'] == songplay_log_pyspark_df['artist'],
        song_pyspark_df['title'] == songplay_log_pyspark_df['song'],
        song_pyspark_df['duration'] == songplay_log_pyspark_df['length']
        ]
    songplays_table = song_pyspark_df.join(
        songplay_log_pyspark_df,
        conditions
    ).withColumn(
        'songplay_id', F.monotonically_increasing_id()
    ).withColumn(
        'start_time', to_timestamp(F.date_format(
                (F.col('ts')/1000).cast(dataType=T.TimestampType()), tsFormat), tsFormat)
    ).select(
        'songplay_id',
        'start_time',
        F.col('userId').alias('user_id'),
        'level',
        'song_id',
        'artist_id',
        F.col('sessionId').alias('session_id'),
        'location',
        F.col('userAgent').alias('user_agent'),
        F.year(F.col('start_time')).alias('year'),
        F.month(F.col('start_time')).alias('month'))
    print('Logging Songplays Schema ~')
    songplays_table.printSchema()

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.partitionBy("year","month").parquet(input_data + 'songplays')

In [14]:
input_data = f'{os.getcwd()}/'
output_data = ""
process_log_data(spark, input_data, output_data)

Logging Raw Log Schema ~
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+-------------------------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------------------------------------------------------------------

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|26    |Ryan     |Smith   |M     |free |
|26    |Ryan     |Smith   |M     |free |
|26    |Ryan     |Smith   |M     |free |
|9     |Wyatt    |Scott   |M     |free |
|12    |Austin   |Rosales |M     |free |
+------+---------+--------+------+-----+
only showing top 5 rows

Logging Songplay Log Schema ~ After creating timestamp column
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = t

In [80]:
print(songplays)

{'SONGPLAY_ID': 'songplay_id', 'START_TIME': 'start_time', 'USER_ID': 'user_id', 'LEVEL': 'level', 'SONG_ID': 'song_id', 'ARTIST_ID': 'artist_id', 'SESSION_ID': 'session_id', 'LOCATION': 'location', 'USER_AGENT': 'user_agent'}


In [81]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)

In [82]:
df = sqlContext.read.parquet('./users/part-00000-16c7b49e-8652-4fbe-8ced-50762b14b0df-c000.snappy.parquet')

In [83]:
df.head()

Row(userId='26', firstName='Ryan', lastName='Smith', gender='M', level='free')

In [87]:
# To read multiple files
user_df = sqlContext.read.parquet('./users')
user_df.head(10)

[Row(userId='26', firstName='Ryan', lastName='Smith', gender='M', level='free'),
 Row(userId='26', firstName='Ryan', lastName='Smith', gender='M', level='free'),
 Row(userId='26', firstName='Ryan', lastName='Smith', gender='M', level='free'),
 Row(userId='9', firstName='Wyatt', lastName='Scott', gender='M', level='free'),
 Row(userId='12', firstName='Austin', lastName='Rosales', gender='M', level='free'),
 Row(userId='61', firstName='Samuel', lastName='Gonzalez', gender='M', level='free'),
 Row(userId='61', firstName='Samuel', lastName='Gonzalez', gender='M', level='free'),
 Row(userId='', firstName=None, lastName=None, gender=None, level='paid'),
 Row(userId='80', firstName='Tegan', lastName='Levine', gender='F', level='paid'),
 Row(userId='80', firstName='Tegan', lastName='Levine', gender='F', level='paid')]

In [88]:
print(type(user_df))

<class 'pyspark.sql.dataframe.DataFrame'>


### Run all outside

In [17]:
input_data = f'{os.getcwd()}/'
output_data = ""
# process_song_data(spark, input_data, output_data)

In [19]:
# get filepath to song data file
song_data = input_data + "song_data/*/*/*/*.json"

# read song data file
raw_song_df = spark.read.json(song_data)
print('Printing Raw Song Schema ~')
raw_song_df.printSchema()
raw_song_df.show(5, truncate=False)

# extract columns to create songs table - PySpark DF
print('Printing Song Schema ~')
print(songs)
songs_pyspark_df = raw_song_df.select(
    [col for col in songs.values()]
)
print('Songs - PySpark DF')
songs_pyspark_df.show()

# write songs table to parquet files partitioned by year and artist
songs_pyspark_df.write.partitionBy('year', 'artist_id').parquet(output_data + 'songs')

# extract columns to create artists table - PySpark DF
print('Printing Artist Schema ~')
print(artists)
artists_pyspark_df = raw_song_df.select(
    [col for col in artists.values()]
)
print('Artists - PySpark DF')
artists_pyspark_df.show()

# write artists table to parquet files
artists_pyspark_df.write.parquet(output_data + 'artists')

Printing Raw Song Schema ~
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+---------------------------+----------------+---------------------+---------+---------+------------------+-------------------------------------------+----+
|artist_id         |artist_latitude|artist_location            |artist_longitude|artist_name          |duration |num_songs|song_id           |title                                      |year|
+------------------+---------------+---------------------------+----------------+---------------------+---------+---------+------------------+--------------------------

In [6]:
input_data = f'{os.getcwd()}/'
output_data = ""
# process_log_data(spark, input_data, output_data)

In [21]:
# get filepath to log data file
log_data = input_data + 'log_data'

# read log data file
raw_log_df = spark.read.json(log_data)
print('Logging Raw Log Schema ~')
raw_log_df.printSchema()
raw_log_df.show(5, truncate=False)

# filter by actions for song plays
songplay_log_pyspark_df = raw_log_df.filter(F.col('page') == 'NextSong')
songplay_log_pyspark_df.show()

# extract columns for users table    
users_pyspark_df = raw_log_df.select(
    [col for col in users.values()]
)
print('Logging users table - PySpark')
users_pyspark_df.show(5, truncate=False)

# write users table to parquet files
users_pyspark_df.write.parquet(input_data + 'users')

# create timestamp column from original timestamp column
tsFormat = "yyyy-MM-dd HH:MM:ss z"
# converting ts to timestamp format
time_table = songplay_log_pyspark_df.withColumn(
    'timestamp', to_timestamp(
        F.date_format(
            (songplay_log_pyspark_df.ts/1000).cast(dataType=T.TimestampType()), tsFormat), tsFormat))
print('Logging Songplay Log Schema ~ After creating timestamp column')
time_table.printSchema()

# extract columns to create time table
time_table = time_table.select(
    F.col('timestamp').alias('start_time'),
    F.year(F.col('timestamp')).alias('year'),
    F.month(F.col('timestamp')).alias('month'),
    F.weekofyear(F.col('timestamp')).alias('weekofyear'),
    F.dayofmonth(F.col('timestamp')).alias('dayofmonth'),
    F.hour(F.col('timestamp')).alias('hour')
)
print('Logging Songplay Log Schema ~ After creating time table')
time_table.printSchema()
time_table.show(5)

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year","month").parquet(input_data + 'time')

# get filepath to song data file
song_data = input_data + "song_data/*/*/*/*.json"
song_pyspark_df = spark.read.json(song_data)
print('Logging Song Log Schema ~')
song_pyspark_df.printSchema()

# read in song data to use for songplays table
conditions = [
    song_pyspark_df['artist_name'] == songplay_log_pyspark_df['artist'],
    song_pyspark_df['title'] == songplay_log_pyspark_df['song'],
    song_pyspark_df['duration'] == songplay_log_pyspark_df['length']
    ]
songplays_table = song_pyspark_df.join(
    songplay_log_pyspark_df,
    conditions
).withColumn(
    'songplay_id', F.monotonically_increasing_id()
).withColumn(
    'start_time', to_timestamp(F.date_format(
            (F.col('ts')/1000).cast(dataType=T.TimestampType()), tsFormat), tsFormat)
).select(
    'songplay_id',
    'start_time',
    F.col('userId').alias('user_id'),
    'level',
    'song_id',
    'artist_id',
    F.col('sessionId').alias('session_id'),
    'location',
    F.col('userAgent').alias('user_agent'),
    F.year(F.col('start_time')).alias('year'),
    F.month(F.col('start_time')).alias('month'))
print('Logging Songplays Schema ~')
songplays_table.printSchema()

# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year","month").parquet(input_data + 'songplays')

Logging Raw Log Schema ~
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+-------------------------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------------------------------------------------------------------

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|26    |Ryan     |Smith   |M     |free |
|26    |Ryan     |Smith   |M     |free |
|26    |Ryan     |Smith   |M     |free |
|9     |Wyatt    |Scott   |M     |free |
|12    |Austin   |Rosales |M     |free |
+------+---------+--------+------+-----+
only showing top 5 rows

Logging Songplay Log Schema ~ After creating timestamp column
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = t

In [26]:
song_pyspark_df.head(5)

[Row(artist_id='ARKFYS91187B98E58F', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Jeff And Sheri Easter', duration=267.7024, num_songs=1, song_id='SOYMRWW12A6D4FAB14', title='The Moon And I (Ordinary Day Album Version)', year=0),
 Row(artist_id='AR10USD1187B99F3F1', artist_latitude=None, artist_location='Burlington, Ontario, Canada', artist_longitude=None, artist_name='Tweeterfriendly Music', duration=189.57016, num_songs=1, song_id='SOHKNRJ12A6701D1F8', title='Drop of Rain', year=0),
 Row(artist_id='ARGSJW91187B9B1D6B', artist_latitude=35.21962, artist_location='North Carolina', artist_longitude=-80.01955, artist_name='JennyAnyKind', duration=218.77506, num_songs=1, song_id='SOQHXMF12AB0182363', title='Young Boy Blues', year=0),
 Row(artist_id='ARMJAGH1187FB546F3', artist_latitude=35.14968, artist_location='Memphis, TN', artist_longitude=-90.04892, artist_name='The Box Tops', duration=148.03546, num_songs=1, song_id='SOCIWDW12A8C13D406', title='Soul De

In [29]:
songplay_log_pyspark_df.head()

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time=datetime.datetime(2018, 11, 15, 0, 0, 26))

### Use Pyspark SQL to form table

In [7]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)

In [8]:
song_pyspark_df = sqlContext.read.parquet('./songs')
song_pyspark_df.head(5)

[Row(song_id='SOYMRWW12A6D4FAB14', title='The Moon And I (Ordinary Day Album Version)', duration=267.7024, year=0, artist_id='ARKFYS91187B98E58F'),
 Row(song_id='SOUDSGM12AC9618304', title='Insatiable (Instrumental Version)', duration=266.39628, year=0, artist_id='ARNTLGG11E2835DDB9'),
 Row(song_id='SOMJBYD12A6D4F8557', title='Keepin It Real (Skit)', duration=114.78159, year=0, artist_id='ARD0S291187B9B7BF5'),
 Row(song_id='SOMZWCG12A8C13C480', title="I Didn't Mean To", duration=218.93179, year=0, artist_id='ARD7TVE1187B99BFB1'),
 Row(song_id='SONHOTT12A8C13493C', title='Something Girls', duration=233.40363, year=1982, artist_id='AR7G5I41187FB4CE6C')]

In [9]:
artists_pyspark_df = sqlContext.read.parquet('./artists')
artists_pyspark_df.head(5)

[Row(artist_id='ARKFYS91187B98E58F', artist_name='Jeff And Sheri Easter', artist_location='', artist_latitude=None, artist_longitude=None),
 Row(artist_id='AR10USD1187B99F3F1', artist_name='Tweeterfriendly Music', artist_location='Burlington, Ontario, Canada', artist_latitude=None, artist_longitude=None),
 Row(artist_id='ARGSJW91187B9B1D6B', artist_name='JennyAnyKind', artist_location='North Carolina', artist_latitude=35.21962, artist_longitude=-80.01955),
 Row(artist_id='ARMJAGH1187FB546F3', artist_name='The Box Tops', artist_location='Memphis, TN', artist_latitude=35.14968, artist_longitude=-90.04892),
 Row(artist_id='AR7G5I41187FB4CE6C', artist_name='Adam Ant', artist_location='London, England', artist_latitude=None, artist_longitude=None)]

In [10]:
time_table = sqlContext.read.parquet('./time')
time_table.head(5)

[Row(start_time=datetime.datetime(2018, 11, 15, 0, 0, 26), weekofyear=46, dayofmonth=15, hour=0, year=2018, month=11),
 Row(start_time=datetime.datetime(2018, 11, 15, 0, 0, 21), weekofyear=46, dayofmonth=15, hour=0, year=2018, month=11),
 Row(start_time=datetime.datetime(2018, 11, 15, 0, 0, 41), weekofyear=46, dayofmonth=15, hour=0, year=2018, month=11),
 Row(start_time=datetime.datetime(2018, 11, 15, 3, 0, 9), weekofyear=46, dayofmonth=15, hour=3, year=2018, month=11),
 Row(start_time=datetime.datetime(2018, 11, 15, 5, 0, 55), weekofyear=46, dayofmonth=15, hour=5, year=2018, month=11)]

In [12]:
# get filepath to log data file
log_data = input_data + 'log_data'

# read log data file
raw_log_df = spark.read.json(log_data)
print('Logging Raw Log Schema ~')
raw_log_df.printSchema()
raw_log_df.show(5, truncate=False)

# filter by actions for song plays
songplay_log_pyspark_df = raw_log_df.filter(F.col('page') == 'NextSong')
songplay_log_pyspark_df.show()

Logging Raw Log Schema ~
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+-------------------------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------------------------------------------------------------------

In [14]:
# create timestamp column from original timestamp column
tsFormat = "yyyy-MM-dd HH:MM:ss z"

songplay_log_pyspark_df = songplay_log_pyspark_df.withColumn(
    "start_time",
    to_timestamp(F.date_format(
            (F.col('ts')/1000).cast(dataType=T.TimestampType()), tsFormat), tsFormat)
)

In [15]:
songplay_log_pyspark_df.createOrReplaceTempView("events_table")
song_pyspark_df.createOrReplaceTempView("songs_table")
artists_pyspark_df.createOrReplaceTempView("artists_table")
time_table.createOrReplaceTempView("time_table")

In [16]:
songplays_table = spark.sql(
    """
    SELECT 
        e.start_time,
        e.userId,
        e.level,
        s.song_id,
        s.artist_id,
        e.sessionId,
        e.userAgent,
        t.year,
        t.month
    FROM events_table e 
    JOIN songs_table s ON e.song = s.title AND e.length = s.duration 
    JOIN artists_table a ON e.artist = a.artist_name AND a.artist_id = s.artist_id
    JOIN time_table t ON e.start_time = t.start_time
    """
)

In [17]:
songplays_table.head(5)

[]

In [36]:
songplay_log_pyspark_df.head(5)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time=datetime.datetime(2018, 11, 15, 0, 0, 26)),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"