# Parte I. ETL para pré-processamento dos arquivos.

#### Importação das bibliotecas:

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Criação de uma lista de filepaths para processar a tabela original em csv

In [2]:

# Consulta do diretório atual, adicionando a string correspondente ao subdiretório que contém os arquivos
filepath = os.getcwd() + '/event_data'

# Um loop para criar lista de arquivos e coletar seus endereços
for root, dirs, files in os.walk(filepath):    
    # Junção do diretório raiz com a variável filepath e o nome dos arquivos  
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

#### Processamento dos arquivos para criar o csv que será usado pelas "tabelas" do Apache Cassandra

In [3]:
# Iniciando uma lista vazia das linhas que serão geradas a partir de cada arquivo
full_data_rows_list = [] 
    
# Para cada endereço (filepath) na lista de endereços dos arquivos (file_path_list) : 
for f in file_path_list:

# Lendo o arquivo csv:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # criando um objeto csv reader:
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # Extraindo cada linha, uma a uma e anexando-as:
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            

# Criando um dialeto csv personalizado: 
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

# Criando uma amostra em csv que exclui algumas colunas e será usada para inserir os dados 
# nas tabelas do Cassandra:

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# Conferindo o número de linhas do novo arquivo: 
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Parte II:

##  O arquivo event_datafile_new.csv contém as seguintes colunas:

- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId



## Inicialização e configuração do Cluster do Apache Cassandra:

#### Criando o  Cluster

In [7]:
# Estabelecendo uma conexão com uma instância Cassandra no host local:
from cassandra.cluster import Cluster
cluster = Cluster()

# Para estabelecer uma conexão e começar a executar as queries, precisamos de uma sessão:
session = cluster.connect()

#### Criando o Keyspace

In [21]:
'''
As propriedades da CQL keyspace terão os seguintes valores:

Classe Simple Strategy: É a estratégia de replicação onde todo o cluster tem o mesmo fator de replicação
Fator de Replicação 1 : O numero total e réplicas ao logo do cluster é igual a 1.

'''

try:
    session.execute("""CREATE KEYSPACE IF NOT EXISTS project2
    WITH REPLICATION =
    {'class' : 'SimpleStrategy', 'replication_factor' : 1}""")

except Exception as e:
    print(e)

#### Definindo o Keyspace

In [22]:
try:
    session.set_keyspace('project2')
except Exception as e:
    print(e)


## Testando o Banco de Dados:

####  Obs: No Apache Cassandra modelamos a família de colunas ("tabelas") com base nas requisições que desejamos rodar.

### 1. - Seleção do artista, música e duração no histórico de músicas que foram ouvidas na sessionID = 338

In [23]:
query = "CREATE TABLE IF NOT EXISTS songinfo_by_session_by_item"
query = query + """(
                    sessionId int,
                    iteminSession int,
                    artist text,
                    song text,
                    length float,
                    PRIMARY KEY (sessionId, iteminSession)
                    )"""
try:
    session.execute(query)
except Exception as e:
    print(e)

                    

In [24]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # pula cabeçalho
    for line in csvreader:

        query = "INSERT INTO songinfo_by_session_by_item (sessionId, iteminSession, artist, song,length)"
        query = query + "VALUES (%s,%s,%s,%s, %s)"
        ## Relaciona qual elemento da coluna deve ser relacionado com cada coluna na declaração INSERT
        session.execute(query, (int(line[8]), int(line[3]), str(line[0]), str(line[9]), float(line[5])))

#### Fazendo um SELECT para certificar que os dados foram inseridos em cada uma das tabelas:

In [25]:
query = "SELECT artist, song, length FROM project2.songinfo_by_session_by_item \
            WHERE sessionId = 338 AND iteminSession = 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.artist, row.song, row.length)  

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


### 2. - Para userid = 10 e sessionid = 182, devolva o nome do artista, a música ordenada por itemInSession e o usuário, com primeiro e último nome

In [29]:
query = "CREATE TABLE IF NOT EXISTS songinfo_by_user_by_session"
query = query + """(
                    userId int,
                    sessionId int,
                    itemInSession int,
                    artist text,
                    song text,
                    firstName text,
                    lastName text,
                    PRIMARY KEY ((userId), sessionId, itemInSession))
                    WITH CLUSTERING ORDER BY (sessionId ASC, iteminSession ASC);"""
try:
    session.execute(query)
except Exception as e:
    print(e)

In [30]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # pula cabeçalho
    for line in csvreader:
        query1 = "INSERT INTO songinfo_by_user_by_session (userId, sessionId, iteminSession, artist, song, firstName, lastName)"
        query1 = query1 + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query1, (int(line[10]),int(line[8]),int(line[3]), str(line[0]), str(line[9]), str(line[1]), str(line[4])))

In [31]:
query1 = "SELECT artist, song, firstName, lastName FROM songinfo_by_user_by_session \
            WHERE userId = 10 AND sessionId = 182"
try:
    rows = session.execute(query1)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.firstname, row.lastname)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


### 3.- Imprima o nome e sobrenome de todos os usuários que ouviram a música "All Hands Against His Own" :

In [32]:
query2 = "CREATE TABLE IF NOT EXISTS userinfo_by_song"
query2 = query2 + """(userid int, firstName text, 
                        lastName text, song text, 
                        PRIMARY KEY ((song),userid)
                        )"""
try:
    session.execute(query2)
except Exception as e:
    print(e)

                    

In [33]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # pula cabeçalho
    for line in csvreader:
        query2 = "INSERT INTO userinfo_by_song (userid, firstName, lastName, song)"
        query2 = query2 + " VALUES (%s, %s, %s, %s)"
        session.execute(query2, (int(line[10]),str(line[1]),str(line[4]),str(line[9])))

In [34]:
query2 = "SELECT firstName, lastName FROM userinfo_by_song WHERE song = 'All Hands Against His Own'"
try:
    rows = session.execute(query2)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Encerramento das tabelas

In [35]:
query = "drop table question1"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table question2"
try:
    rows = session.execute(query1)
except Exception as e:
    print(e)

    
query = "drop table question3"
try:
    rows = session.execute(query2)
except Exception as e:
    print(e)

### Finalização da sessão e da conexão com o cluster¶

In [36]:
session.shutdown()
cluster.shutdown()