# Parte 1: Pipeline ETL para processar os dados

### Importando libs

In [2]:
!pip install cassandra-driver



In [3]:
import pandas as pd
import cassandra
import re
import os
import json
import csv
import numpy as np
import glob

### Criando uma lista de caminhos para processar os arquivos originais e csv dos eventos

In [4]:
# Diretório atual
print(f'Current working directory: {os.getcwd()}')

# Diretório dos arquivos de eventos
filepath = os.getcwd() + '/event_data'

# Cria uma lista de arquivos e coleta cada caminho
for root, dirs, files in os.walk(filepath):
    # Junta cada caminho e seu diretório raíz e subdiretórios usando o glob
    file_path_list = glob.glob(os.path.join(root, '*'))

print(file_path_list[0])
print(len(file_path_list))

Current working directory: C:\Users\Gilberto\Desktop\data_science\data_engineering\projeto_02
C:\Users\Gilberto\Desktop\data_science\data_engineering\projeto_02/event_data\2018-11-01-events.csv
30


## Processando os arquivos em um csv que será utilizado nas tabelas do Cassandra

In [5]:
# Inicializando uma lista vazia que será preenchida com as linhas de cada arquivo
full_data_rows_list = []

# para cada caminho na nossa lista
for f in file_path_list:
    # lendo o arquivo csv
    with open(f, 'r', encoding='utf8', newline='') as csvfile:
        # cria um novo obj leitor de csv
        csvreader = csv.reader(csvfile)
        next(csvreader)
        
        # extraindo os dados de cada linha
        for line in csvreader:
            full_data_rows_list.append(line)
    
print(f'Total rows : {len(full_data_rows_list)}')
print(f'Sample data:\n {full_data_rows_list[:5]}')

# Criando um arquivo único de csv que será chamado pelas rotinas que transformarão ele em uma tabela do cassandra
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_data_processed.csv', 'w', encoding='utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'sessionId', 'song', 'userId'])
    
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

Total rows : 8056
Sample data:
 [['', 'Logged In', 'Walter', 'M', '0', 'Frye', '', 'free', 'San Francisco-Oakland-Hayward, CA', 'GET', 'Home', '1.54092E+12', '38', '', '200', '1.54111E+12', '39'], ['', 'Logged In', 'Kaylee', 'F', '0', 'Summers', '', 'free', 'Phoenix-Mesa-Scottsdale, AZ', 'GET', 'Home', '1.54034E+12', '139', '', '200', '1.54111E+12', '8'], ["Des'ree", 'Logged In', 'Kaylee', 'F', '1', 'Summers', '246.30812', 'free', 'Phoenix-Mesa-Scottsdale, AZ', 'PUT', 'NextSong', '1.54034E+12', '139', 'You Gotta Be', '200', '1.54111E+12', '8'], ['', 'Logged In', 'Kaylee', 'F', '2', 'Summers', '', 'free', 'Phoenix-Mesa-Scottsdale, AZ', 'GET', 'Upgrade', '1.54034E+12', '139', '', '200', '1.54111E+12', '8'], ['Mr Oizo', 'Logged In', 'Kaylee', 'F', '3', 'Summers', '144.03873', 'free', 'Phoenix-Mesa-Scottsdale, AZ', 'PUT', 'NextSong', '1.54034E+12', '139', 'Flat 55', '200', '1.54111E+12', '8']]


In [6]:
# checando o numero de linhas no novo arquivo csv
with open('event_data_processed.csv', 'r', encoding='utf8') as f:
    print(sum(1 for line in f))

6821


## Agora podemos trabalhar com o arquivo de csv processado. Nele temos as seguintes colunas:

- artist
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId


### Criando um cluster

#### Docker
```shell
$ docker run --name cassandraDb -d -p 7199:7199 -p 7000:7000 -p 9042:9042 -p 9160:9160 -p 7001:7001 cassandra:3.11
```

In [7]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
    print('Connection Established!')
except Exception as e:
    print(f'Connection Failed. Error: {e}')


Connection Established!


### Criando um keyspace

In [8]:
keyspace_query = """CREATE KEYSPACE IF NOT EXISTS sparkify
    with REPLICATION =
    { 'class': 'SimpleStrategy', 'replication_factor': 1 }
"""

# criando o keyspace
try:
    session.execute(keyspace_query)
except Exception as e:
    print(f"Failed to create keyspace! Error : {e}")

### Escolhendo o Keyspace

In [9]:
session.set_keyspace('sparkify')

### Próximo passo:

Agora precisamos criar tabelas para executar as seguintes consultas. Lembre-se, com o Apache Cassandra, modelamos as tabelas do banco de dados nas consultas que queremos executar.
Abaixo estão as consultas após as quais construiremos o modelo de dados:

1. Forneça o artista, o título da música e a duração da música no histórico do aplicativo de música que foi ouvido durante `sessionId = 338` e `itemInSession = 4`.
2. Dê apenas o seguinte: nome do artista, música (classificada por `itemInSession`) e usuário (nome e sobrenome) para `userid = 10`, `sessionid = 182`.
3. Forneça todos os nomes de usuário (primeiro e último) no histórico do meu app de música que ouviram a música `'All Hands Against His Own'`.


## Query 1

Para a consulta 1, precisamos de uma forma de executar a consulta em sessionId e itemInSession. Portanto, nossa chave primária deve ter essas colunas. Podemos particionar os dados em sessionId.

Nossa consulta Select: `SELECT artist, song, length FROM session_item WHERE sessionId = 338 AND itemInSession = 4`.

Nossa chave primária será (sessionId, itemInSession), onde sessionId é a chave de partição e itemInSession é a coluna de cluster.
Colunas que incluímos na tabela:

In [10]:
created_query1 = """
CREATE TABLE IF NOT EXISTS session_item (
    artist text,
    song text,
    length float,
    sessionId int,
    itemInSession int,
    PRIMARY KEY (sessionId, itemInSession)
)
"""

try:
    session.execute(created_query1)
    print('Table created!')
except Exception as e:
    print(f'Table creation failed! Error {e}')

Table created!


In [11]:
file = 'event_data_processed.csv'

with open(file, encoding='utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # Pula o cabeçalho
    for line in csvreader:
        query = "INSERT INTO session_item (artist, song, length, sessionId, itemInSession) "
        query += " VALUES (%s, %s, %s, %s, %s) "
        session.execute(query, (line[0], line[10], float(line[5]), int(line[8]), int(line[3])))

### SELECT para verificar os dados que foram inseridos.

In [12]:
select_query1 = "SELECT artist, song, length FROM  session_item WHERE sessionId = 338 AND itemInSession = 4"

try:
    rows = session.execute(select_query1)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(artist='Faithless', song='50', length=495.30731201171875)


## Query 2

Para a consulta 2, precisamos de uma maneira de executar a consulta em sessionId e userId. Além disso, precisamos dos dados classificados em itemInSession. Portanto, nossa chave primária deve ter essas colunas. Podemos particionar os dados em uma chave composta (sessionId, userId).

Nossa consulta Select: `SELECT artist, song, firstName, lastName FROM user_session WHERE sessionId = 182 AND userId = 10`

Nossa chave primária será ((sessionId, userId), itemInSession)), onde (sessionId, userId) é a chave de partição e itemInSession é a coluna de cluster.

Além disso, estamos usando a cláusula - WITH CLUSTERING ORDER BY (itemInSession ASC), para classificar nossos dados com base em itemInSession.

Colunas que incluímos na tabela: sessionId, userId, artist, song, firstName, lastName, itemInSession.

In [13]:
create_query2 = """
CREATE TABLE IF NOT EXISTS user_session (
    sessionId int, 
    userId int, 
    artist text, 
    song text, 
    firstName text, 
    lastName text, 
    itemInSession int, 
    PRIMARY KEY ((sessionId, userId), itemInSession)
) WITH CLUSTERING ORDER BY (itemInSession ASC)
"""

try: 
    session.execute(create_query2)
    print("Table Created!")
except Exception as e:
    print(f"Table creation failed! Error : {e}")

Table Created!


In [16]:
file = 'event_data_processed.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO user_session (sessionId, userId, artist, song, firstName, lastName, itemInSession) "
        query += " VALUES (%s, %s, %s, %s, %s, %s, %s) "
        session.execute(query, (int(line[8]), int(line[10]), line[0], line[9], line[1], line[4], int(line[3])))

In [17]:
select_query2 = "SELECT artist, song, firstName, lastName FROM  user_session where sessionId = 182 and userId = 10"

try:
    rows = session.execute(select_query2)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


## Query 3

Para a consulta 3, precisamos de uma maneira de executar a consulta na música. Então, nossa chave primária deve ter música. Além disso, a consulta deve ser tal que não contenha usuários duplicados para uma música. Portanto, precisamos modelar os dados de forma que não permitamos usuários duplicados para uma música em nossa mesa. Isso pode ser conseguido incluindo userId em nossa chave primária.

Nossa consulta Select: `SELECT song, firstName, lastName FROM user_song WHERE song = 'All Hands Against His Own'`

Nossa chave primária será ((song), userId)), onde song é a chave de partição e userId é a coluna de cluster.

Colunas que incluímos na tabela: song, userId, firstName, lastName.

In [18]:
create_query3 = """
CREATE TABLE IF NOT EXISTS user_song (
    song text, 
    userId int, 
    firstName text, 
    lastName text, 
    PRIMARY KEY ((song), userId)
)"""

try: 
    session.execute(create_query3)
    print("Table Created!")
except Exception as e:
    print(f"Table creation failed! Error : {e}")

Table Created!


In [19]:
file = 'event_data_processed.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    
    for line in csvreader:
        query = "INSERT INTO user_song (song, userId, firstName, lastName)"
        query += " VALUES (%s, %s, %s, %s) "
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [20]:
select_query2 = "SELECT song, firstName, lastName FROM user_song WHERE song = 'All Hands Against His Own'"

try:
    rows = session.execute(select_query2)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(song='All Hands Against His Own', firstname='Jacqueline', lastname='Lynch')
Row(song='All Hands Against His Own', firstname='Tegan', lastname='Levine')
Row(song='All Hands Against His Own', firstname='Sara', lastname='Johnson')


## Excluindo as tabelas antes de fechar a conexão

In [21]:
session.execute("DROP TABLE IF EXISTS sparkify.session_item")
session.execute("DROP TABLE IF EXISTS sparkify.user_session")
session.execute("DROP TABLE IF EXISTS sparkify.user_song")

OperationTimedOut: errors={'127.0.0.1:9042': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1:9042

## Fechando conexão e a sessão

In [22]:
session.shutdown()
cluster.shutdown()

### 