# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [2]:
def process_csv(dir, paths):    
    csvrows = []

    # cache all the csv rows from each file in memory
    for f in paths:
        file = dir + '/' + f
        with open(file, 'r', encoding='utf8', newline='') as csvfile:
            csvreader = csv.reader(csvfile)
            next(csvreader)
            for line in csvreader:
                csvrows.append(line)

    # write out the csv rows to a single denormalized csv file
    csv.register_dialect('dialect', quoting=csv.QUOTE_ALL,
                         skipinitialspace=True)
    with open('event_datafile_new.csv', 'w', encoding='utf8', newline='') as outFile:
        writer = csv.writer(outFile, dialect='dialect')
        writer.writerow(['artist', 'firstName', 'gender', 'itemInSession', 'lastName',
                         'length', 'level', 'location', 'sessionId', 'song', 'userId'])
        for row in csvrows:
            if row[0] == '':
                continue
            writer.writerow((row[0], row[2], row[3], row[4], row[5],
                             row[6], row[7], row[8], row[12], row[13], row[16]))
            
dir = os.getcwd() + '/event_data'
process_csv(dir, os.walk(dir).__next__()[2])

In [3]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [4]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [5]:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkify
        WITH REPLICATION = {
           'class' : 'SimpleStrategy',
           'replication_factor' : 1
        };
    """)

<cassandra.cluster.ResultSet at 0x7f1e49498860>

#### Set Keyspace

In [6]:
session.execute('USE sparkify')

<cassandra.cluster.ResultSet at 0x7f1e49498208>

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [8]:
    session.execute("""
        CREATE TABLE IF NOT EXISTS sparkify.plays_by_session_and_item(
            session_id int,
            item_in_session int,
            artist text,
            song_title text,
            song_len float,
            PRIMARY KEY (session_id, item_in_session))
    """)

    session.execute("""
        CREATE TABLE IF NOT EXISTS sparkify.artist_song_user_by_user_id_session_id(
            session_id int,
            item_in_session int,
            user_id int,
            artist text,
            song_title text,
            user_first text,
            user_last text,
            PRIMARY KEY ((user_id, session_id), item_in_session))
    """)

    session.execute("""
        CREATE TABLE IF NOT EXISTS sparkify.user_first_last_by_song_listened_to(
            song_title text,
            user_id int,
            user_first text,
            user_last text,
            PRIMARY KEY (song_title, user_id))
    """)
    
    with open('event_datafile_new.csv', encoding='utf8') as file:
        csvreader = csv.reader(file)
        csvreader.__next__()
        for line in csvreader:
            session.execute("""
                INSERT INTO sparkify.plays_by_session_and_item(session_id, item_in_session, artist, song_title, song_len)
                VALUES ({session_id}, {item_in_session}, '{artist}', '{song_title}', {song_len})
            """.format(session_id=line[8], item_in_session=line[3], artist=line[0].replace("'", "''"), song_title=line[9].replace("'", "''"), song_len=line[5]))
            session.execute("""
                INSERT INTO sparkify.artist_song_user_by_user_id_session_id(session_id, item_in_session, user_id, artist, song_title, user_first, user_last)
                VALUES ({session_id}, {item_in_session}, {user_id}, '{artist}', '{song_title}', '{user_first}', '{user_last}')
            """.format(
                session_id=line[8],
                item_in_session=line[3],
                user_id=line[10],
                artist=line[0].replace("'", "''"),
                song_title=line[9].replace("'", "''"),
                user_first=line[1].replace("'", "''"),
                user_last=line[4].replace("'", "''")
            ))
            session.execute("""
                INSERT INTO sparkify.user_first_last_by_song_listened_to(song_title, user_id, user_first, user_last)
                VALUES ('{song_title}', {user_id}, '{user_first}', '{user_last}')
            """.format(
                song_title=line[9].replace("'", "''"),
                user_id=line[10],
                user_first=line[1].replace("'", "''"),
                user_last=line[4].replace("'", "''")
            ))

#### Do a SELECT to verify that the data have been inserted into each table

In [9]:
    rows = session.execute('SELECT * FROM sparkify.plays_by_session_and_item LIMIT 2')

    for row in rows:
        print(row)

Row(session_id=23, item_in_session=0, artist='Regina Spektor', song_len=191.08526611328125, song_title='The Calculation (Album Version)')
Row(session_id=23, item_in_session=1, artist='Octopus Project', song_len=250.95791625976562, song_title='All Of The Champs That Ever Lived')


### COPY AND REPEAT THE ABOVE THREE CELLS FOR EACH OF THE THREE QUESTIONS

In [10]:
# Get the artist, song and song length of the song play for session 338 with item in session 4

In [11]:
    rows = session.execute(
        'SELECT artist, song_title, song_len FROM sparkify.plays_by_session_and_item WHERE session_id=338 AND item_in_session=4')

    # We are only expecting one row here
    for row in rows:
        print({"artist": row[0], "song_title": row[1], "song_len": row[2]})
        
        


{'artist': 'Faithless', 'song_title': 'Music Matters (Mark Knight Dub)', 'song_len': 495.30731201171875}


In [12]:
# Get the artist and song along with the listener user's 
# first and last for the song play with listener user id 
# 10 and the session id is 182

In [13]:
    rows = session.execute("""
        SELECT artist, song_title, user_first, user_last
        FROM sparkify.artist_song_user_by_user_id_session_id
        WHERE session_id=182 AND user_id=10
        GROUP BY item_in_session
    """)

    for row in rows:
        print({"artist": row[0], "song_title": row[1], "user_first": row[2], "user_last": row[3]})
        

{'artist': 'Down To The Bone', 'song_title': "Keep On Keepin' On", 'user_first': 'Sylvie', 'user_last': 'Cruz'}
{'artist': 'Three Drives', 'song_title': 'Greece 2000', 'user_first': 'Sylvie', 'user_last': 'Cruz'}
{'artist': 'Sebastien Tellier', 'song_title': 'Kilometer', 'user_first': 'Sylvie', 'user_last': 'Cruz'}
{'artist': 'Lonnie Gordon', 'song_title': 'Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', 'user_first': 'Sylvie', 'user_last': 'Cruz'}


In [14]:
# Get the list of all users (first and last) that listed to the song 'All Hands Against His Own'

In [15]:
    rows = session.execute("""
        SELECT user_first, user_last
        FROM sparkify.user_first_last_by_song_listened_to
        WHERE song_title='All Hands Against His Own'
        GROUP BY user_id
    """)

    result_set = []
    for row in rows:
        print({"user_first": row[0], "user_last": row[1]})

{'user_first': 'Jacqueline', 'user_last': 'Lynch'}
{'user_first': 'Tegan', 'user_last': 'Levine'}
{'user_first': 'Sara', 'user_last': 'Johnson'}


### Drop the tables before closing out the sessions

In [16]:
session.execute('DROP TABLE IF EXISTS sparkify.plays_by_session_and_item')
session.execute('DROP TABLE IF EXISTS sparkify.artist_song_user_by_user_id_session_id')
session.execute('DROP TABLE IF EXISTS sparkify.user_first_last_by_song_listened_to')

<cassandra.cluster.ResultSet at 0x7f1e23f922b0>

### Close the session and cluster connection¶

In [17]:
session.shutdown()
cluster.shutdown()