# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import os
import glob
import pandas as pd
import numpy as np
import cassandra

#### Creating list of filepaths to process original event csv data files

In [2]:
# Write your code here
filepath= r"D:\youtube_event_data"
fileNames = []
for root, dirs, files in os.walk(filepath):
    files = glob.glob(os.path.join(root, '*.csv'))
    for name in files:
        current = os.path.abspath(name)
        fileNames.append(current)

#### Processing the CSV files to create the new `youtube_events_data_new.csv` file

In [3]:
# Write your code here
All_df = pd.DataFrame()
for currentFile in fileNames:
    df = pd.read_csv(currentFile)
#     print("before ", df.shape)
    All_df = All_df.append(df, ignore_index = True)
#     print("After ", All_df.shape)
    
All_df.shape

(8056, 17)

In [35]:
All_df.to_csv (r'youtube_events_data_new.csv', index = False, header=True)

In [4]:
# check the number of rows in your new csv file
with open('youtube_events_data_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

8057


In [5]:
All_df.head()

Unnamed: 0,youtuber,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,video,status,ts,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540920000000.0,38,,200,1541110000000.0,39.0
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540340000000.0,139,,200,1541110000000.0,8.0
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextVideo,1540340000000.0,139,You Gotta Be,200,1541110000000.0,8.0
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540340000000.0,139,,200,1541110000000.0,8.0
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextVideo,1540340000000.0,139,Flat 55,200,1541110000000.0,8.0


# Part II. Complete the Apache Cassandra coding portion of your project. 

**TODO: You have to complete the cell below with your `youtube_events_data_new.csv`columns and a screenshot of CSV file data**

## The new CSV file titled `youtube_events_data_new.csv`, The event_datafile_new.csv contains the following columns: 


youtuber
auth
firstName
gender
itemInSession
lastName
length
level
location
method
page
registration
sessionId
video
status
ts
userId


The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**youtube_events_data_new.csv**</font> after the code above is run:<br>

<img src="images/i.JPG">

#### Creating a Cluster

In [10]:
# Create a Cassandra Cluster
from cassandra.cluster import Cluster
try: 
    cluster = Cluster(['127.0.0.1']) #If you have a locally installed Apache Cassandra instance
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace `youtubedb`

In [11]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS youtubedb 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace `youtubedb`

In [12]:
try:
    session.set_keyspace('youtubedb')
except Exception as e:
    print(e)

## Create queries to ask the following three questions of the data

1. Give me the youtuber, video title and video's length in the YouTube app history that was heard during  sessionId = 338, and itemInSession  = 4


2. Give me only the following: name of youtuber, video (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

3. Give me every user name (first and last) in my YouTube app history who watched the video 'All Hands Against His Own'

### Give me the youtuber, video title and video's length in the YouTube app history that was heard during sessionId = 338, and itemInSession = 4

In [13]:
query = "CREATE TABLE IF NOT EXISTS table1 "
query = query + "(sessionId varint, itemInSession varint, youtuber text, \
video text, length text, PRIMARY KEY (sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [17]:
insert = "INSERT INTO table1 (sessionId, itemInSession, youtuber, video, length) VALUES (%s,%s,%s,%s,%s)"
for i in All_df.index:
    table1_data= (All_df.loc[i]['sessionId'],All_df.loc[i]['itemInSession'],str(All_df.loc[i]['youtuber']),str(All_df.loc[i]['video']),str(All_df.loc[i]['length']))
    session.execute(insert,table1_data)
print('inserted')
    

inserted


In [18]:
read = "SELECT * from table1 WHERE sessionId = 338 AND itemInSession = 4"
rows = session.execute(read)
for r in rows:
    print(r)

Row(sessionid=338, iteminsession=4, length='495.3073', video='Music Matters (Mark Knight Dub)', youtuber='Faithless')


### Give me only the following: name of youtuber, video (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [23]:
#################################################################
query = "CREATE TABLE IF NOT EXISTS table2 "
query = query + "(sessionId varint, userId text, itemInSession varint, youtuber text, video text, firstName text,\
lastName text, PRIMARY KEY ((userId, sessionId), itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [24]:
insert = insert = "INSERT INTO table2 (sessionId, userId,itemInSession, youtuber, video, firstName,lastName) VALUES (%s,%s,%s,%s,%s,%s,%s)"
for i in All_df.index:
    table2_data= (All_df.loc[i]['sessionId'],str(All_df.loc[i]['userId']),All_df.loc[i]['itemInSession'],str(All_df.loc[i]['youtuber']),str(All_df.loc[i]['video']),str(All_df.loc[i]['firstName']),str(All_df.loc[i]['lastName']))
    session.execute(insert,table2_data)
print('inserted')
    


inserted


In [25]:
query = "select itemInSession, youtuber, video, firstName, lastName from table2 \
WHERE userId = '10.0' AND sessionId =182"
rows = session.execute(query)
    
for r in rows:
    print (r)

Row(iteminsession=0, youtuber='Down To The Bone', video="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(iteminsession=1, youtuber='Three Drives', video='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(iteminsession=2, youtuber='Sebastien Tellier', video='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(iteminsession=3, youtuber='Lonnie Gordon', video='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


### Give me every user name (first and last) in my YouTube app history who watched the video 'All Hands Against His Own'

In [26]:
query = "CREATE TABLE IF NOT EXISTS table3 "
query = query + "(video text, userId text, sessionId varint, \
itemInSession varint, firstName text, lastName text,  PRIMARY KEY (video, userId))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [28]:
insert = insert = "INSERT INTO table3 (video,userId,sessionId ,itemInSession,firstName,lastName) VALUES (%s,%s,%s,%s,%s,%s)"
for i in All_df.index:
    table3_data= (str(All_df.loc[i]['video']),str(All_df.loc[i]['userId']),All_df.loc[i]['sessionId'],All_df.loc[i]['itemInSession'],str(All_df.loc[i]['firstName']),str(All_df.loc[i]['lastName']))
    session.execute(insert,table3_data)
print('inserted')

inserted


In [30]:
query = "SELECT firstName, lastName from table3 WHERE video='All Hands Against His Own'"
rows = session.execute(query)
    
for r in rows:
    print (r)

Row(firstname='Jacqueline', lastname='Lynch')
Row(firstname='Tegan', lastname='Levine')
Row(firstname='Sara', lastname='Johnson')


### Drop the tables before closing out the sessions

In [31]:
session.execute("drop table table1")
session.execute("drop table table2")
session.execute("drop table table3")

<cassandra.cluster.ResultSet at 0x27af5df7940>

### Close the session and cluster connection¶

In [32]:
session.shutdown()
cluster.shutdown()